From 7298c07844955d841ea7d9cdd5434c9b58d0ef4d Mon Sep 17 00:00:00 2001 From: Yordis Prieto Date: Wed, 4 Dec 2024 17:03:00 -0500 Subject: [PATCH] feat: add opentelemetry_commanded Co-authored-by: Lee Eggebroteni Co-authored-by: Dave Lucia Co-authored-by: Santiago Cabrera --- .github/workflows/elixir.yml | 39 ++++ .../opentelemetry_commanded/.formatter.exs | 4 + .../opentelemetry_commanded/.gitignore | 24 +++ .../opentelemetry_commanded/LICENSE | 201 ++++++++++++++++++ .../opentelemetry_commanded/README.md | 43 ++++ .../lib/commanded/aggregate.ex | 98 +++++++++ .../lib/commanded/application.ex | 68 ++++++ .../lib/commanded/event_handler.ex | 100 +++++++++ .../lib/commanded/event_store.ex | 107 ++++++++++ .../lib/commanded/middleware.ex | 36 ++++ .../lib/commanded/process_manager.ex | 104 +++++++++ .../lib/commanded/util.ex | 35 +++ .../lib/opentelemetry_commanded.ex | 11 + .../opentelemetry_commanded/mix.exs | 69 ++++++ .../opentelemetry_commanded/mix.lock | 20 ++ .../test/commanded/aggregate_test.exs | 149 +++++++++++++ .../test/commanded/application_test.exs | 131 ++++++++++++ .../test/commanded/event_handler_test.exs | 158 ++++++++++++++ .../test/commanded/event_store_test.exs | 61 ++++++ .../test/commanded/process_manager_test.exs | 160 ++++++++++++++ .../test/dummy_app/aggregate.ex | 10 + .../test/dummy_app/app.ex | 16 ++ .../dummy_app/command_validator_middleware.ex | 23 ++ .../test/dummy_app/commands.ex | 27 +++ .../test/dummy_app/event_handler.ex | 17 ++ .../test/dummy_app/events.ex | 28 +++ .../test/dummy_app/handler.ex | 13 ++ .../test/dummy_app/process_manager.ex | 36 ++++ .../test/dummy_app/router.ex | 19 ++ .../test/opentelemetry_commanded_test.exs | 14 ++ .../test/support/commanded_case.ex | 126 +++++++++++ .../test/test_helper.exs | 1 + 32 files changed, 1948 insertions(+) create mode 100644 instrumentation/opentelemetry_commanded/.formatter.exs create mode 100644 instrumentation/opentelemetry_commanded/.gitignore create mode 100644 instrumentation/opentelemetry_commanded/LICENSE create mode 100644 instrumentation/opentelemetry_commanded/README.md create mode 100644 instrumentation/opentelemetry_commanded/lib/commanded/aggregate.ex create mode 100644 instrumentation/opentelemetry_commanded/lib/commanded/application.ex create mode 100644 instrumentation/opentelemetry_commanded/lib/commanded/event_handler.ex create mode 100644 instrumentation/opentelemetry_commanded/lib/commanded/event_store.ex create mode 100644 instrumentation/opentelemetry_commanded/lib/commanded/middleware.ex create mode 100644 instrumentation/opentelemetry_commanded/lib/commanded/process_manager.ex create mode 100644 instrumentation/opentelemetry_commanded/lib/commanded/util.ex create mode 100644 instrumentation/opentelemetry_commanded/lib/opentelemetry_commanded.ex create mode 100644 instrumentation/opentelemetry_commanded/mix.exs create mode 100644 instrumentation/opentelemetry_commanded/mix.lock create mode 100644 instrumentation/opentelemetry_commanded/test/commanded/aggregate_test.exs create mode 100644 instrumentation/opentelemetry_commanded/test/commanded/application_test.exs create mode 100644 instrumentation/opentelemetry_commanded/test/commanded/event_handler_test.exs create mode 100644 instrumentation/opentelemetry_commanded/test/commanded/event_store_test.exs create mode 100644 instrumentation/opentelemetry_commanded/test/commanded/process_manager_test.exs create mode 100644 instrumentation/opentelemetry_commanded/test/dummy_app/aggregate.ex create mode 100644 instrumentation/opentelemetry_commanded/test/dummy_app/app.ex create mode 100644 instrumentation/opentelemetry_commanded/test/dummy_app/command_validator_middleware.ex create mode 100644 instrumentation/opentelemetry_commanded/test/dummy_app/commands.ex create mode 100644 instrumentation/opentelemetry_commanded/test/dummy_app/event_handler.ex create mode 100644 instrumentation/opentelemetry_commanded/test/dummy_app/events.ex create mode 100644 instrumentation/opentelemetry_commanded/test/dummy_app/handler.ex create mode 100644 instrumentation/opentelemetry_commanded/test/dummy_app/process_manager.ex create mode 100644 instrumentation/opentelemetry_commanded/test/dummy_app/router.ex create mode 100644 instrumentation/opentelemetry_commanded/test/opentelemetry_commanded_test.exs create mode 100644 instrumentation/opentelemetry_commanded/test/support/commanded_case.ex create mode 100644 instrumentation/opentelemetry_commanded/test/test_helper.exs diff --git a/.github/workflows/elixir.yml b/.github/workflows/elixir.yml index 5f39b512..505dba6c 100644 --- a/.github/workflows/elixir.yml +++ b/.github/workflows/elixir.yml @@ -651,3 +651,42 @@ jobs: if: matrix.check_formatted - name: Test run: mix test + + opentelemetry-commanded: + needs: [test-matrix] + if: (contains(github.event.pull_request.labels.*.name, 'elixir') && contains(github.event.pull_request.labels.*.name, 'opentelemetry_commanded')) + env: + app: "opentelemetry_commanded" + defaults: + run: + working-directory: instrumentation/${{ env.app }} + runs-on: ubuntu-24.04 + name: Opentelemetry Commanded test on Elixir ${{ matrix.elixir_version }} (OTP ${{ matrix.otp_version }}) + strategy: + fail-fast: false + matrix: ${{ fromJson(needs.test-matrix.outputs.matrix) }} + steps: + - uses: actions/checkout@v4 + - uses: erlef/setup-beam@v1 + with: + version-type: strict + otp-version: ${{ matrix.otp_version }} + elixir-version: ${{ matrix.elixir_version }} + rebar3-version: ${{ matrix.rebar3_version }} + - name: Cache + uses: actions/cache@v4 + with: + path: | + ~/deps + ~/_build + key: ${{ runner.os }}-build-${{ matrix.otp_version }}-${{ matrix.elixir_version }}-v3-${{ hashFiles('**/mix.lock') }} + - name: Fetch deps + if: steps.deps-cache.outputs.cache-hit != 'true' + run: mix deps.get + - name: Compile project + run: mix compile --warnings-as-errors + - name: Check formatting + run: mix format --check-formatted + if: matrix.check_formatted + - name: Test + run: mix test diff --git a/instrumentation/opentelemetry_commanded/.formatter.exs b/instrumentation/opentelemetry_commanded/.formatter.exs new file mode 100644 index 00000000..d2cda26e --- /dev/null +++ b/instrumentation/opentelemetry_commanded/.formatter.exs @@ -0,0 +1,4 @@ +# Used by "mix format" +[ + inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"] +] diff --git a/instrumentation/opentelemetry_commanded/.gitignore b/instrumentation/opentelemetry_commanded/.gitignore new file mode 100644 index 00000000..94a053ad --- /dev/null +++ b/instrumentation/opentelemetry_commanded/.gitignore @@ -0,0 +1,24 @@ +# The directory Mix will write compiled artifacts to. +/_build/ + +# If you run "mix test --cover", coverage assets end up here. +/cover/ + +# The directory Mix downloads your dependencies sources to. +/deps/ + +# Where third-party dependencies like ExDoc output generated docs. +/doc/ + +# Ignore .fetch files in case you like to edit your project deps locally. +/.fetch + +# If the VM crashes, it generates a dump, let's ignore it too. +erl_crash.dump + +# Also ignore archive artifacts (built via "mix archive.build"). +*.ez + +# Ignore package tarball (built via "mix hex.build"). +opentelemetry_commanded-*.tar + diff --git a/instrumentation/opentelemetry_commanded/LICENSE b/instrumentation/opentelemetry_commanded/LICENSE new file mode 100644 index 00000000..261eeb9e --- /dev/null +++ b/instrumentation/opentelemetry_commanded/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/instrumentation/opentelemetry_commanded/README.md b/instrumentation/opentelemetry_commanded/README.md new file mode 100644 index 00000000..e3001d0b --- /dev/null +++ b/instrumentation/opentelemetry_commanded/README.md @@ -0,0 +1,43 @@ +# OpentelemetryCommanded + +Telemetry handler that creates OpenTelemetry spans from [Commanded](https://github.com/commanded/commanded) commands and events. + +## Supported spans + +OpentelemetryCommanded currently creates spans for: + +- Application Dispatch +- Aggregate Execute +- Event.Handler Handle +- ProcessManager Handle +- EventStore `append_to_stream` and `stream_forward` + +## Installation + +If [available in Hex](https://hex.pm/docs/publish), the package can be installed +by adding `opentelemetry_commanded` to your list of dependencies in `mix.exs`: + +```elixir +def deps do + [ + {:opentelemetry_commanded, "~> 0.1.0"} + ] +end +``` + +Once installed, execute the following function in your application behaviour before your top-level supervisor starts. + +```elixir +# lib/my_app/application.ex +OpentelemetryCommanded.setup() +``` + +Then add the `OpentelemetryCommanded.Middleware` to your `Commanded` routers + +```elixir +middleware OpentelemetryCommanded.Middleware +``` + +## Documentation + +[https://hexdocs.pm/opentelemetry_commanded](https://hexdocs.pm/opentelemetry_commanded). diff --git a/instrumentation/opentelemetry_commanded/lib/commanded/aggregate.ex b/instrumentation/opentelemetry_commanded/lib/commanded/aggregate.ex new file mode 100644 index 00000000..d3ee41b8 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/lib/commanded/aggregate.ex @@ -0,0 +1,98 @@ +defmodule OpentelemetryCommanded.Aggregate do + @moduledoc false + + require OpenTelemetry.Tracer + + import OpentelemetryCommanded.Util + + alias OpenTelemetry.Span + + @tracer_id __MODULE__ + + def setup() do + :telemetry.attach( + {__MODULE__, :start}, + [:commanded, :aggregate, :execute, :start], + &__MODULE__.handle_start/4, + [] + ) + + :telemetry.attach( + {__MODULE__, :stop}, + [:commanded, :aggregate, :execute, :stop], + &__MODULE__.handle_stop/4, + [] + ) + + :telemetry.attach( + {__MODULE__, :exception}, + [:commanded, :aggregate, :execute, :exception], + &__MODULE__.handle_exception/4, + [] + ) + end + + def handle_start(_event, _, meta, _) do + context = meta.execution_context + + safe_context_propagation(context.metadata["trace_ctx"]) + + attributes = [ + "commanded.aggregate_uuid": meta.aggregate_uuid, + "commanded.aggregate_version": meta.aggregate_version, + "commanded.application": meta.application, + "commanded.causation_id": context.causation_id, + "commanded.command": struct_name(context.command), + "commanded.correlation_id": context.correlation_id, + "commanded.function": context.function, + "messaging.conversation_id": context.correlation_id, + "messaging.destination": context.handler, + "messaging.destination_kind": "aggregate", + "messaging.message_id": context.causation_id, + "messaging.operation": "receive", + "messaging.system": "commanded" + ] + + OpentelemetryTelemetry.start_telemetry_span( + @tracer_id, + "commanded.aggregate.execute", + meta, + %{ + kind: :consumer, + attributes: attributes + } + ) + end + + def handle_stop(_event, _measurements, meta, _) do + # ensure the correct span is current + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta) + + events = Map.get(meta, :events, []) + Span.set_attribute(ctx, :"commanded.event_count", Enum.count(events)) + + if error = meta[:error] do + Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error))) + end + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta) + end + + def handle_exception( + _event, + _measurements, + %{kind: kind, reason: reason, stacktrace: stacktrace} = meta, + _config + ) do + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta) + + # try to normalize all errors to Elixir exceptions + exception = Exception.normalize(kind, reason, stacktrace) + + # record exception and mark the span as errored + Span.record_exception(ctx, exception, stacktrace) + Span.set_status(ctx, OpenTelemetry.status(:error, inspect(reason))) + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta) + end +end diff --git a/instrumentation/opentelemetry_commanded/lib/commanded/application.ex b/instrumentation/opentelemetry_commanded/lib/commanded/application.ex new file mode 100644 index 00000000..3d083bd8 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/lib/commanded/application.ex @@ -0,0 +1,68 @@ +defmodule OpentelemetryCommanded.Application do + @moduledoc false + + require OpenTelemetry.Tracer + + import OpentelemetryCommanded.Util + + alias OpenTelemetry.Span + + @tracer_id __MODULE__ + + def setup do + :telemetry.attach( + {__MODULE__, :start}, + [:commanded, :application, :dispatch, :start], + &__MODULE__.handle_start/4, + [] + ) + + :telemetry.attach( + {__MODULE__, :stop}, + [:commanded, :application, :dispatch, :stop], + &__MODULE__.handle_stop/4, + [] + ) + end + + def handle_start(_event, _, meta, _) do + context = meta.execution_context + + safe_context_propagation(context.metadata["trace_ctx"]) + + attributes = [ + "commanded.application": meta.application, + "commanded.causation_id": context.causation_id, + "commanded.command": struct_name(context.command), + "commanded.correlation_id": context.correlation_id, + "commanded.function": context.function, + "messaging.conversation_id": context.correlation_id, + "messaging.destination": context.handler, + "messaging.destination_kind": "command_handler", + "messaging.message_id": context.causation_id, + "messaging.operation": "receive", + "messaging.system": "commanded" + ] + + OpentelemetryTelemetry.start_telemetry_span( + @tracer_id, + "commanded.application.dispatch", + meta, + %{ + kind: :consumer, + attributes: attributes + } + ) + end + + def handle_stop(_event, _measurements, meta, _) do + # ensure the correct span is current and update the status + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta) + + if error = meta[:error] do + Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error))) + end + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta) + end +end diff --git a/instrumentation/opentelemetry_commanded/lib/commanded/event_handler.ex b/instrumentation/opentelemetry_commanded/lib/commanded/event_handler.ex new file mode 100644 index 00000000..dd87329f --- /dev/null +++ b/instrumentation/opentelemetry_commanded/lib/commanded/event_handler.ex @@ -0,0 +1,100 @@ +defmodule OpentelemetryCommanded.EventHandler do + @moduledoc false + + require OpenTelemetry.Tracer + + import OpentelemetryCommanded.Util + + alias OpenTelemetry.Span + + @tracer_id __MODULE__ + + def setup do + :telemetry.attach( + {__MODULE__, :start}, + [:commanded, :event, :handle, :start], + &__MODULE__.handle_start/4, + [] + ) + + :telemetry.attach( + {__MODULE__, :stop}, + [:commanded, :event, :handle, :stop], + &__MODULE__.handle_stop/4, + [] + ) + + :telemetry.attach( + {__MODULE__, :exception}, + [:commanded, :event, :handle, :exception], + &__MODULE__.handle_exception/4, + [] + ) + end + + def handle_start(_event, _measurements, meta, _) do + recorded_event = meta.recorded_event + + safe_context_propagation(recorded_event.metadata["trace_ctx"]) + + attributes = [ + "commanded.application": meta.application, + "commanded.causation_id": recorded_event.causation_id, + "commanded.correlation_id": recorded_event.correlation_id, + "commanded.event": recorded_event.event_type, + "commanded.event_id": recorded_event.event_id, + "commanded.event_number": recorded_event.event_number, + "commanded.handler_name": meta.handler_name, + "commanded.stream_id": recorded_event.stream_id, + "commanded.stream_version": recorded_event.stream_version, + "messaging.conversation_id": recorded_event.correlation_id, + "messaging.destination": meta.handler_module, + "messaging.destination_kind": "event_handler", + "messaging.message_id": recorded_event.causation_id, + "messaging.operation": "receive", + "messaging.system": "commanded" + # TODO add back + # consistency: meta.consistency, + # TODO add this back into commanded + # "event.last_seen": meta.last_seen_event + ] + + OpentelemetryTelemetry.start_telemetry_span( + @tracer_id, + "commanded.event.handle", + meta, + %{ + kind: :consumer, + attributes: attributes + } + ) + end + + def handle_stop(_event, _measurements, meta, _) do + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta) + + if error = meta[:error] do + Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error))) + end + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta) + end + + def handle_exception( + _event, + _measurements, + %{kind: kind, reason: reason, stacktrace: stacktrace} = meta, + _config + ) do + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta) + + # try to normalize all errors to Elixir exceptions + exception = Exception.normalize(kind, reason, stacktrace) + + # record exception and mark the span as errored + Span.record_exception(ctx, exception, stacktrace) + Span.set_status(ctx, OpenTelemetry.status(:error, inspect(reason))) + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta) + end +end diff --git a/instrumentation/opentelemetry_commanded/lib/commanded/event_store.ex b/instrumentation/opentelemetry_commanded/lib/commanded/event_store.ex new file mode 100644 index 00000000..182f3282 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/lib/commanded/event_store.ex @@ -0,0 +1,107 @@ +defmodule OpentelemetryCommanded.EventStore do + @moduledoc false + + require OpenTelemetry.Tracer + + import OpentelemetryCommanded.Util + + alias OpenTelemetry.Span + + @tracer_id __MODULE__ + + def setup do + ~w( + ack_event + adapter + append_to_stream + delete_snapshot + delete_subscription + read_snapshot + record_snapshot + stream_forward + stream_forward + stream_forward + subscribe + subscribe_to + subscribe_to + unsubscribe + )a + |> Enum.each(fn event -> + :telemetry.attach( + {__MODULE__, :start}, + [:commanded, :event_store, event, :start], + &__MODULE__.handle_start/4, + [] + ) + + :telemetry.attach( + {__MODULE__, :stop}, + [:commanded, :event_store, event, :stop], + &__MODULE__.handle_stop/4, + [] + ) + + :telemetry.attach( + {__MODULE__, :exception}, + [:commanded, :event_store, event, :exception], + &__MODULE__.handle_exception/4, + [] + ) + end) + end + + def handle_start([_, _, action, _type], _measurements, meta, _) do + event = meta.event + + safe_context_propagation(event.metadata["trace_ctx"]) + + attributes = [ + "commanded.application": meta.application, + "commanded.causation_id": event.causation_id, + "commanded.correlation_id": event.correlation_id, + "commanded.event": event.event_type, + "commanded.event_id": event.event_id, + "commanded.event_number": event.event_number, + "commanded.stream_id": event.stream_id, + "commanded.stream_version": event.stream_version + ] + + OpentelemetryTelemetry.start_telemetry_span( + @tracer_id, + "commanded.event_store.#{action}", + meta, + %{ + kind: :internal, + attributes: attributes + } + ) + end + + def handle_stop(_event, _measurements, meta, _) do + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta) + + if error = meta[:error] do + Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error))) + end + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta) + end + + def handle_exception( + _event, + _measurements, + %{kind: kind, reason: reason, stacktrace: stacktrace} = meta, + _config + ) do + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta) + + # try to normalize all errors to Elixir exceptions + exception = Exception.normalize(kind, reason, stacktrace) + + # record exception and mark the span as errored + Span.record_exception(ctx, exception, stacktrace) + Span.set_status(ctx, OpenTelemetry.status(:error, inspect(reason))) + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta) + end +end diff --git a/instrumentation/opentelemetry_commanded/lib/commanded/middleware.ex b/instrumentation/opentelemetry_commanded/lib/commanded/middleware.ex new file mode 100644 index 00000000..344e556c --- /dev/null +++ b/instrumentation/opentelemetry_commanded/lib/commanded/middleware.ex @@ -0,0 +1,36 @@ +defmodule OpentelemetryCommanded.Middleware do + @moduledoc """ + A middleware for propagating span context to Aggregates, Event Handlers, etc + + Usage: + + ```elixir + # In your commanded router + + middleware OpentelemetryCommanded.Middleware + ``` + """ + + @behaviour Commanded.Middleware + + require OpenTelemetry.Tracer + + import Commanded.Middleware.Pipeline + import OpentelemetryCommanded.Util + + alias Commanded.Middleware.Pipeline + + def before_dispatch(%Pipeline{} = pipeline) do + trace_headers = :otel_propagator_text_map.inject([]) + + assign_metadata(pipeline, "trace_ctx", encode_headers(trace_headers)) + end + + def after_dispatch(pipeline) do + pipeline + end + + def after_failure(pipeline) do + pipeline + end +end diff --git a/instrumentation/opentelemetry_commanded/lib/commanded/process_manager.ex b/instrumentation/opentelemetry_commanded/lib/commanded/process_manager.ex new file mode 100644 index 00000000..64dd16ac --- /dev/null +++ b/instrumentation/opentelemetry_commanded/lib/commanded/process_manager.ex @@ -0,0 +1,104 @@ +defmodule OpentelemetryCommanded.ProcessManager do + @moduledoc false + + require OpenTelemetry.Tracer + + import OpentelemetryCommanded.Util + + alias OpenTelemetry.Span + + @tracer_id __MODULE__ + + def setup do + :telemetry.attach( + {__MODULE__, :start}, + [:commanded, :process_manager, :handle, :start], + &__MODULE__.handle_start/4, + [] + ) + + :telemetry.attach( + {__MODULE__, :stop}, + [:commanded, :process_manager, :handle, :stop], + &__MODULE__.handle_stop/4, + [] + ) + + :telemetry.attach( + {__MODULE__, :exception}, + [:commanded, :process_manager, :handle, :exception], + &__MODULE__.handle_exception/4, + [] + ) + end + + def handle_start(_event, _, meta, _) do + recorded_event = meta.recorded_event + safe_context_propagation(recorded_event.metadata["trace_ctx"]) + + attributes = [ + "commanded.application": meta.application, + "commanded.causation_id": recorded_event.causation_id, + "commanded.correlation_id": recorded_event.correlation_id, + "commanded.event": recorded_event.event_type, + "commanded.event_id": recorded_event.event_id, + "commanded.event_number": recorded_event.event_number, + "commanded.handler_name": meta.process_manager_name, + "commanded.process_uuid": meta.process_uuid, + "commanded.stream_id": recorded_event.stream_id, + "commanded.stream_version": recorded_event.stream_version, + "messaging.conversation_id": recorded_event.correlation_id, + "messaging.destination": meta.process_manager_module, + "messaging.destination_kind": "process_manager", + "messaging.message_id": recorded_event.causation_id, + "messaging.operation": "receive", + "messaging.system": "commanded" + # TODO add back + # consistency: meta.consistency, + # TODO add this back into commanded + # "event.last_seen": meta.last_seen_event + ] + + OpentelemetryTelemetry.start_telemetry_span( + @tracer_id, + "commanded.process_manager.handle", + meta, + %{ + kind: :consumer, + attributes: attributes + } + ) + end + + def handle_stop(_event, _measurements, meta, _) do + # ensure the correct span is current + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta) + + commands = Map.get(meta, :commands, []) + Span.set_attribute(ctx, :"commanded.command_count", Enum.count(commands)) + + if error = meta[:error] do + Span.set_status(ctx, OpenTelemetry.status(:error, inspect(error))) + end + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta) + end + + def handle_exception( + _event, + _measurements, + %{kind: kind, reason: reason, stacktrace: stacktrace} = meta, + _config + ) do + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta) + + # try to normalize all errors to Elixir exceptions + exception = Exception.normalize(kind, reason, stacktrace) + + # record exception and mark the span as errored + Span.record_exception(ctx, exception, stacktrace) + Span.set_status(ctx, OpenTelemetry.status(:error, inspect(reason))) + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta) + end +end diff --git a/instrumentation/opentelemetry_commanded/lib/commanded/util.ex b/instrumentation/opentelemetry_commanded/lib/commanded/util.ex new file mode 100644 index 00000000..e6e0422f --- /dev/null +++ b/instrumentation/opentelemetry_commanded/lib/commanded/util.ex @@ -0,0 +1,35 @@ +defmodule OpentelemetryCommanded.Util do + @moduledoc false + + def safe_context_propagation(trace_ctx) when is_nil(trace_ctx) do + nil + end + + def safe_context_propagation(trace_ctx) do + trace_ctx + |> decode_headers() + |> :otel_propagator_text_map.extract() + end + + def encode_headers(headers), do: Enum.map(headers, &Tuple.to_list/1) + + def decode_headers(headers), do: Enum.map(headers, &List.to_tuple/1) + + def encode_ctx(:undefined), do: :undefined + def encode_ctx(ctx), do: Tuple.to_list(ctx) + + def decode_ctx("undefined"), do: :undefined + def decode_ctx(:undefined), do: :undefined + + def decode_ctx(ctx) do + Enum.map(ctx, fn + el when is_binary(el) -> String.to_existing_atom(el) + el -> el + end) + |> List.to_tuple() + end + + def struct_name(%name{}) do + name + end +end diff --git a/instrumentation/opentelemetry_commanded/lib/opentelemetry_commanded.ex b/instrumentation/opentelemetry_commanded/lib/opentelemetry_commanded.ex new file mode 100644 index 00000000..d7fa631e --- /dev/null +++ b/instrumentation/opentelemetry_commanded/lib/opentelemetry_commanded.ex @@ -0,0 +1,11 @@ +defmodule OpentelemetryCommanded do + @moduledoc File.read!("./README.md") |> String.split("\n") |> Enum.drop(2) |> Enum.join("\n") + + def setup do + OpentelemetryCommanded.Application.setup() + OpentelemetryCommanded.Aggregate.setup() + OpentelemetryCommanded.EventHandler.setup() + OpentelemetryCommanded.EventStore.setup() + OpentelemetryCommanded.ProcessManager.setup() + end +end diff --git a/instrumentation/opentelemetry_commanded/mix.exs b/instrumentation/opentelemetry_commanded/mix.exs new file mode 100644 index 00000000..957f6258 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/mix.exs @@ -0,0 +1,69 @@ +defmodule OpentelemetryCommanded.MixProject do + use Mix.Project + + @version "0.2.0" + @github_url "https://github.com/open-telemetry/opentelemetry-erlang-contrib/blob/main/instrumentation/opentelemetry_commanded" + + def project do + [ + app: :opentelemetry_commanded, + version: @version, + elixir: "~> 1.14", + elixirc_paths: elixirc_paths(Mix.env()), + start_permanent: Mix.env() == :prod, + package: package(), + deps: deps(), + description: "Trace Commanded CQRS operations with OpenTelemetry", + source_url: @github_url, + homepage_url: @github_url, + name: "Opentelemetry Commanded", + docs: docs() + ] + end + + defp elixirc_paths(:test), do: ["lib", "test/support"] + defp elixirc_paths(_), do: ["lib"] + + def application do + [extra_applications: [:logger]] + end + + defp package do + [ + licenses: ["Apache-2"], + links: %{ + "GitHub" => @github_url, + "OpenTelemetry Erlang" => "https://github.com/open-telemetry/opentelemetry-erlang", + "OpenTelemetry Erlang Contrib" => + "https://github.com/open-telemetry/opentelemetry-erlang-contrib", + "OpenTelemetry.io" => "https://opentelemetry.io" + } + ] + end + + defp deps do + [ + {:commanded, "~> 1.4"}, + {:opentelemetry_telemetry, "~> 1.0"}, + {:telemetry, "~> 1.0"}, + {:opentelemetry, "~> 1.0"}, + + # Testing + {:jason, "~> 1.2", only: :test}, + {:ecto, "~> 3.12", only: :test}, + {:ex_doc, ">= 0.0.0", only: [:dev], runtime: false} + ] + end + + defp docs do + [ + main: "OpentelemetryCommanded", + skip_undefined_reference_warnings_on: ["CHANGELOG.md"], + source_url_pattern: "#{@github_url}/%{path}#L%{line}", + extras: [ + "README.md", + "CHANGELOG.md" + ] + ] + end +end diff --git a/instrumentation/opentelemetry_commanded/mix.lock b/instrumentation/opentelemetry_commanded/mix.lock new file mode 100644 index 00000000..13fe4c4e --- /dev/null +++ b/instrumentation/opentelemetry_commanded/mix.lock @@ -0,0 +1,20 @@ +%{ + "backoff": {:hex, :backoff, "1.1.6", "83b72ed2108ba1ee8f7d1c22e0b4a00cfe3593a67dbc792799e8cce9f42f796b", [:rebar3], [], "hexpm", "cf0cfff8995fb20562f822e5cc47d8ccf664c5ecdc26a684cbe85c225f9d7c39"}, + "commanded": {:hex, :commanded, "1.4.1", "928b8357ebe1817f88b109693b4d717d20c11ef45cebe42a71dee0a56be36c2c", [:mix], [{:backoff, "~> 1.1", [hex: :backoff, repo: "hexpm", optional: false]}, {:jason, "~> 1.3", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.2 or ~> 0.3", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "6cd94b4b3369871c030a83b934548720cc834ec7b8549ba031510120aceb7ef9"}, + "decimal": {:hex, :decimal, "2.2.0", "df3d06bb9517e302b1bd265c1e7f16cda51547ad9d99892049340841f3e15836", [:mix], [], "hexpm", "af8daf87384b51b7e611fb1a1f2c4d4876b65ef968fa8bd3adf44cff401c7f21"}, + "earmark_parser": {:hex, :earmark_parser, "1.4.28", "0bf6546eb7cd6185ae086cbc5d20cd6dbb4b428aad14c02c49f7b554484b4586", [:mix], [], "hexpm", "501cef12286a3231dc80c81352a9453decf9586977f917a96e619293132743fb"}, + "ecto": {:hex, :ecto, "3.12.5", "4a312960ce612e17337e7cefcf9be45b95a3be6b36b6f94dfb3d8c361d631866", [:mix], [{:decimal, "~> 2.0", [hex: :decimal, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "6eb18e80bef8bb57e17f5a7f068a1719fbda384d40fc37acb8eb8aeca493b6ea"}, + "elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"}, + "ex_doc": {:hex, :ex_doc, "0.28.5", "3e52a6d2130ce74d096859e477b97080c156d0926701c13870a4e1f752363279", [:mix], [{:earmark_parser, "~> 1.4.19", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "d2c4b07133113e9aa3e9ba27efb9088ba900e9e51caa383919676afdf09ab181"}, + "jason": {:hex, :jason, "1.4.4", "b9226785a9aa77b6857ca22832cffa5d5011a667207eb2a0ad56adb5db443b8a", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "c5eb0cab91f094599f94d55bc63409236a8ec69a21a67814529e8d5f6cc90b3b"}, + "makeup": {:hex, :makeup, "1.1.0", "6b67c8bc2882a6b6a445859952a602afc1a41c2e08379ca057c0f525366fc3ca", [:mix], [{:nimble_parsec, "~> 1.2.2 or ~> 1.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "0a45ed501f4a8897f580eabf99a2e5234ea3e75a4373c8a52824f6e873be57a6"}, + "makeup_elixir": {:hex, :makeup_elixir, "0.16.0", "f8c570a0d33f8039513fbccaf7108c5d750f47d8defd44088371191b76492b0b", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}, {:nimble_parsec, "~> 1.2.3", [hex: :nimble_parsec, repo: "hexpm", optional: false]}], "hexpm", "28b2cbdc13960a46ae9a8858c4bebdec3c9a6d7b4b9e7f4ed1502f8159f338e7"}, + "makeup_erlang": {:hex, :makeup_erlang, "0.1.1", "3fcb7f09eb9d98dc4d208f49cc955a34218fc41ff6b84df7c75b3e6e533cc65f", [:mix], [{:makeup, "~> 1.0", [hex: :makeup, repo: "hexpm", optional: false]}], "hexpm", "174d0809e98a4ef0b3309256cbf97101c6ec01c4ab0b23e926a9e17df2077cbb"}, + "nimble_parsec": {:hex, :nimble_parsec, "1.2.3", "244836e6e3f1200c7f30cb56733fd808744eca61fd182f731eac4af635cc6d0b", [:mix], [], "hexpm", "c8d789e39b9131acf7b99291e93dae60ab48ef14a7ee9d58c6964f59efb570b0"}, + "opentelemetry": {:hex, :opentelemetry, "1.1.1", "02de53d7dcafc087793ddf98cac946aaaa13c99cb6a7e568d9bb5ce4552b340e", [:rebar3], [{:opentelemetry_api, "~> 1.1", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}], "hexpm", "43a807d536bca55542731ddb5ecf68c0b3b433ff98713a6496058075bac70031"}, + "opentelemetry_api": {:hex, :opentelemetry_api, "1.1.0", "156366bfbf249f54daf2626e087e29ad91201eab670993fd9ae1bd278d03a096", [:mix, :rebar3], [], "hexpm", "e0d0b49e21e5785da675c97104c385283cae84fcc0d8522932a5dcf55489ead1"}, + "opentelemetry_erlang_contrib": {:git, "https://github.com/open-telemetry/opentelemetry-erlang-contrib.git", "fb2595d2fbf4935c169c561aaf169bab3c527e9a", [ref: "fb2595d2fbf4935c169c561aaf169bab3c527e9a"]}, + "opentelemetry_telemetry": {:hex, :opentelemetry_telemetry, "1.0.0", "d5982a319e725fcd2305b306b65c18a86afdcf7d96821473cf0649ff88877615", [:mix, :rebar3], [{:opentelemetry_api, "~> 1.0", [hex: :opentelemetry_api, repo: "hexpm", optional: false]}, {:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:telemetry_registry, "~> 0.3.0", [hex: :telemetry_registry, repo: "hexpm", optional: false]}], "hexpm", "3401d13a1d4b7aa941a77e6b3ec074f0ae77f83b5b2206766ce630123a9291a9"}, + "telemetry": {:hex, :telemetry, "1.3.0", "fedebbae410d715cf8e7062c96a1ef32ec22e764197f70cda73d82778d61e7a2", [:rebar3], [], "hexpm", "7015fc8919dbe63764f4b4b87a95b7c0996bd539e0d499be6ec9d7f3875b79e6"}, + "telemetry_registry": {:hex, :telemetry_registry, "0.3.0", "6768f151ea53fc0fbca70dbff5b20a8d663ee4e0c0b2ae589590e08658e76f1e", [:mix, :rebar3], [{:telemetry, "~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "492e2adbc609f3e79ece7f29fec363a97a2c484ac78a83098535d6564781e917"}, +} diff --git a/instrumentation/opentelemetry_commanded/test/commanded/aggregate_test.exs b/instrumentation/opentelemetry_commanded/test/commanded/aggregate_test.exs new file mode 100644 index 00000000..d60971c9 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/commanded/aggregate_test.exs @@ -0,0 +1,149 @@ +defmodule OpentelemetryCommanded.AggregateTest do + use OpentelemetryCommanded.CommandedCase, async: false + + import ExUnit.CaptureLog + + alias OpentelemetryCommanded.DummyApp.Commands, as: C + + describe "dispatch command when Telemetry attached" do + setup _ do + case OpentelemetryCommanded.Aggregate.setup() do + :ok -> :ok + {:error, :already_exists} -> :ok + end + end + + test "Success should create span", context do + :ok = app_dispatch(context, %C.Ok{id: "ACC123"}) + + assert_receive {:span, + span( + name: "commanded.aggregate.execute", + kind: :consumer, + parent_span_id: parent_span_id, + attributes: attributes + )} + + # Get parent span to ensure context has been propagated across the process + assert_receive {:span, span(name: parent_span_name, span_id: ^parent_span_id)} + + assert parent_span_name in [ + "opentelemetry_commanded.test", + "commanded.application.dispatch" + ] + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{ + "commanded.command": OpentelemetryCommanded.DummyApp.Commands.Ok + }, + attributes + ) + end + + test "Error should create span with error set", context do + _log = + capture_log(fn -> + # TODO: shouldn't be same command as for application_test + {:error, "some error"} = + app_dispatch(context, %C.Error{id: "ACC123", message: "some error"}) + end) + + assert_receive {:span, + span( + name: "commanded.aggregate.execute", + kind: :consumer, + status: {:status, :error, exception_message}, + attributes: attributes + )} + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{ + "commanded.command": OpentelemetryCommanded.DummyApp.Commands.Error + }, + attributes + ) + + assert exception_message =~ "some error" + end + + test "Exception should create span with error set", context do + _log = + capture_log(fn -> + {:error, %RuntimeError{message: "some error"}} = + app_dispatch(context, %C.RaiseException{id: "ACC123", message: "some error"}) + end) + + assert_receive {:span, + span( + name: "commanded.aggregate.execute", + kind: :consumer, + status: {:status, :error, exception_message}, + attributes: attributes, + events: events + )} + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{ + "commanded.command": OpentelemetryCommanded.DummyApp.Commands.RaiseException + }, + attributes + ) + + assert exception_message =~ "some error" + + [ + event( + name: "exception", + attributes: event_attributes + ) + ] = :otel_events.list(events) + + event_attributes = :otel_attributes.map(event_attributes) + + assert match?( + %{ + "exception.message" => "some error", + "exception.stacktrace" => _, + "exception.type" => "Elixir.RuntimeError" + }, + event_attributes + ) + + stack_trace = event_attributes["exception.stacktrace"] + assert stack_trace =~ "OpentelemetryCommanded.DummyApp.Handler.handle/2" + end + end + + defp has_basic_attributes!(attributes, correlation_id) do + assert match?( + %{ + "commanded.aggregate_uuid": "ACC123", + "commanded.aggregate_version": 0, + "commanded.application": OpentelemetryCommanded.DummyApp.App, + "commanded.causation_id": _, + "commanded.command": _, + "commanded.correlation_id": ^correlation_id, + "commanded.function": :handle, + "messaging.conversation_id": ^correlation_id, + "messaging.destination": OpentelemetryCommanded.DummyApp.Handler, + "messaging.destination_kind": "aggregate", + "messaging.message_id": _, + "messaging.operation": "receive", + "messaging.system": "commanded" + }, + attributes + ) + end +end diff --git a/instrumentation/opentelemetry_commanded/test/commanded/application_test.exs b/instrumentation/opentelemetry_commanded/test/commanded/application_test.exs new file mode 100644 index 00000000..0e5e6565 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/commanded/application_test.exs @@ -0,0 +1,131 @@ +defmodule OpentelemetryCommanded.ApplicationTest do + use OpentelemetryCommanded.CommandedCase, async: false + + alias OpentelemetryCommanded.DummyApp + alias OpentelemetryCommanded.DummyApp.Commands, as: C + + import ExUnit.CaptureLog + require Logger + + describe "dispatch command when telemetry attached" do + setup _ do + case OpentelemetryCommanded.Application.setup() do + :ok -> :ok + {:error, :already_exists} -> :ok + end + end + + test "Success should create span", context do + :ok = app_dispatch(context, %C.Ok{id: "ACC123"}) + + assert_receive {:span, + span( + name: "commanded.application.dispatch", + kind: :consumer, + parent_span_id: parent_span_id, + attributes: attributes + )} + + # Get parent span to ensure context has been propagated across the process + assert_receive {:span, span(name: parent_span_name, span_id: ^parent_span_id)} + assert parent_span_name in ["opentelemetry_commanded.test"] + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{"commanded.command": DummyApp.Commands.Ok}, + attributes + ) + end + + test "when error create span with error status", context do + # TODO: this needs to be done better when Commanded Application dispatch spans fixed + {:error, "some error"} = + app_dispatch( + context, + %C.Error{id: Ecto.UUID.generate(), message: "some error"} + ) + + assert_receive {:span, + span( + name: "commanded.application.dispatch", + kind: :consumer, + status: {:status, :error, "\"some error\""}, + attributes: attributes + )} + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{"commanded.command": DummyApp.Commands.Error}, + attributes + ) + end + + test "when no handler for Command, create span with error status", context do + captured_log = + capture_log(fn -> + {:error, + %FunctionClauseError{ + args: nil, + arity: 2, + clauses: nil, + function: :handle, + kind: nil, + module: OpentelemetryCommanded.DummyApp.Handler + }} = + app_dispatch(context, %C.RaiseException{ + id: Ecto.UUID.generate(), + message: "no handler" + }) + end) + + assert captured_log =~ "%FunctionClauseError{" + assert captured_log =~ "module: OpentelemetryCommanded.DummyApp.Handler" + assert captured_log =~ "function: :handle" + assert captured_log =~ "arity: 2" + + assert_receive {:span, + span( + name: "commanded.application.dispatch", + kind: :consumer, + status: {:status, :error, exception_message}, + attributes: attributes + )} + + assert exception_message =~ "%FunctionClauseError{" + assert exception_message =~ "module: OpentelemetryCommanded.DummyApp.Handler" + assert exception_message =~ "function: :handle" + assert exception_message =~ "arity: 2" + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{"commanded.command": DummyApp.Commands.RaiseException}, + attributes + ) + end + end + + defp has_basic_attributes!(attributes, correlation_id) do + assert match?( + %{ + "commanded.application": DummyApp.App, + "commanded.function": :handle, + "messaging.conversation_id": ^correlation_id, + "messaging.destination": DummyApp.Handler, + "messaging.destination_kind": "command_handler", + "messaging.message_id": _, + "messaging.operation": "receive", + "messaging.system": "commanded" + }, + attributes + ) + end +end diff --git a/instrumentation/opentelemetry_commanded/test/commanded/event_handler_test.exs b/instrumentation/opentelemetry_commanded/test/commanded/event_handler_test.exs new file mode 100644 index 00000000..4b5fd058 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/commanded/event_handler_test.exs @@ -0,0 +1,158 @@ +defmodule OpentelemetryCommanded.EventHandlerTest do + use OpentelemetryCommanded.CommandedCase, async: false + + import ExUnit.CaptureLog + + alias OpentelemetryCommanded.DummyApp.Commands, as: C + alias OpentelemetryCommanded.DummyApp.Events, as: E + + describe "dispatch command when Telemetry attached" do + setup _ do + case OpentelemetryCommanded.EventHandler.setup() do + :ok -> :ok + {:error, :already_exists} -> :ok + end + end + + test "Success should create span", context do + :ok = app_dispatch(context, %C.Ok{id: "ACC123"}) + + assert_receive {:span, + span( + name: "commanded.event.handle", + kind: :consumer, + parent_span_id: parent_span_id, + attributes: attributes + )} + + # Get parent span to ensure context has been propagated across the process + assert_receive {:span, span(name: parent_span_name, span_id: ^parent_span_id)} + + assert parent_span_name in [ + "opentelemetry_commanded.test", + "commanded.application.dispatch" + ] + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{ + "commanded.event": "Elixir.OpentelemetryCommanded.DummyApp.Events.OkEvent" + }, + attributes + ) + end + + test "Error should create span with error set", context do + _log = + capture_log(fn -> + :ok = + app_dispatch(context, %C.DoEvent{ + id: "ACC123", + event: %E.ErrorInEventHandlerEvent{message: "some error"} + }) + end) + + assert_receive {:span, + span( + name: "commanded.event.handle", + kind: :consumer, + status: {:status, :error, exception_message}, + attributes: attributes + )} + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{ + "commanded.event": + "Elixir.OpentelemetryCommanded.DummyApp.Events.ErrorInEventHandlerEvent" + }, + attributes + ) + + assert exception_message =~ "some error" + end + + test "Exception should create span with error set", context do + _log = + capture_log(fn -> + :ok = + app_dispatch(context, %C.DoEvent{ + id: "ACC123", + event: %E.ExceptionInEventHandlerEvent{message: "some error"} + }) + end) + + assert_receive {:span, + span( + name: "commanded.event.handle", + kind: :consumer, + status: {:status, :error, exception_message}, + attributes: attributes, + events: events + )} + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{ + "commanded.event": + "Elixir.OpentelemetryCommanded.DummyApp.Events.ExceptionInEventHandlerEvent" + }, + attributes + ) + + assert exception_message =~ "some error" + + [ + event( + name: "exception", + attributes: event_attributes + ) + ] = :otel_events.list(events) + + event_attributes = :otel_attributes.map(event_attributes) + + assert match?( + %{ + "exception.message" => "some error", + "exception.stacktrace" => _, + "exception.type" => "Elixir.RuntimeError" + }, + event_attributes + ) + + stack_trace = event_attributes["exception.stacktrace"] + assert stack_trace =~ "OpentelemetryCommanded.DummyApp.EventHandler.handle/2" + end + end + + defp has_basic_attributes!(attributes, correlation_id) do + assert match?( + %{ + "commanded.application": OpentelemetryCommanded.DummyApp.App, + "commanded.causation_id": _, + "commanded.correlation_id": ^correlation_id, + "commanded.event_id": _, + "commanded.event_number": 1, + "commanded.handler_name": "EventHandler", + "commanded.stream_id": "ACC123", + "commanded.stream_version": 1, + "messaging.conversation_id": ^correlation_id, + "messaging.destination": OpentelemetryCommanded.DummyApp.EventHandler, + "messaging.destination_kind": "event_handler", + "messaging.message_id": _, + "messaging.operation": "receive", + "messaging.system": "commanded" + }, + attributes + ) + end +end diff --git a/instrumentation/opentelemetry_commanded/test/commanded/event_store_test.exs b/instrumentation/opentelemetry_commanded/test/commanded/event_store_test.exs new file mode 100644 index 00000000..e5274360 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/commanded/event_store_test.exs @@ -0,0 +1,61 @@ +defmodule OpentelemetryCommanded.EventStoreTest do + use OpentelemetryCommanded.CommandedCase, async: false + + alias OpentelemetryCommanded.DummyApp.Commands, as: C + + describe "dispatch command when Telemetry attached" do + setup _ do + case OpentelemetryCommanded.EventStore.setup() do + :ok -> :ok + {:error, :already_exists} -> :ok + end + end + + test "Success should create span", context do + :ok = app_dispatch(context, %C.Ok{id: "ACC123"}) + + assert_receive {:span, + span( + name: "commanded.event_store.ack_event", + kind: :internal, + parent_span_id: parent_span_id, + attributes: attributes + )} + + # Get parent span to ensure context has been propagated across the process + assert_receive {:span, span(name: parent_span_name, span_id: ^parent_span_id)} + + assert parent_span_name in [ + "opentelemetry_commanded.test", + "commanded.application.dispatch" + ] + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{ + "commanded.event": "Elixir.OpentelemetryCommanded.DummyApp.Events.OkEvent" + }, + attributes + ) + end + end + + defp has_basic_attributes!(attributes, correlation_id) do + assert match?( + %{ + "commanded.application": OpentelemetryCommanded.DummyApp.App, + "commanded.causation_id": _, + "commanded.correlation_id": ^correlation_id, + "commanded.event": _, + "commanded.event_id": _, + "commanded.event_number": 1, + "commanded.stream_id": "ACC123", + "commanded.stream_version": 1 + }, + attributes + ) + end +end diff --git a/instrumentation/opentelemetry_commanded/test/commanded/process_manager_test.exs b/instrumentation/opentelemetry_commanded/test/commanded/process_manager_test.exs new file mode 100644 index 00000000..30adde81 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/commanded/process_manager_test.exs @@ -0,0 +1,160 @@ +defmodule OpentelemetryCommanded.ProcessManagerTest do + use OpentelemetryCommanded.CommandedCase, async: false + + import ExUnit.CaptureLog + + alias OpentelemetryCommanded.DummyApp.Commands, as: C + alias OpentelemetryCommanded.DummyApp.Events, as: E + + describe "dispatch command when Telemetry attached" do + setup _ do + case OpentelemetryCommanded.ProcessManager.setup() do + :ok -> :ok + {:error, :already_exists} -> :ok + end + end + + test "Success should create span", context do + :ok = app_dispatch(context, %C.Ok{id: "ACC123"}) + + assert_receive {:span, + span( + name: "commanded.process_manager.handle", + kind: :consumer, + parent_span_id: parent_span_id, + attributes: attributes + )} + + # Get parent span to ensure context has been propagated across the process + assert_receive {:span, span(name: parent_span_name, span_id: ^parent_span_id)} + + assert parent_span_name in [ + "opentelemetry_commanded.test", + "commanded.application.dispatch" + ] + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{ + "commanded.event": "Elixir.OpentelemetryCommanded.DummyApp.Events.OkEvent" + }, + attributes + ) + end + + test "Error should create span with error set", context do + _log = + capture_log(fn -> + :ok = + app_dispatch(context, %C.DoEvent{ + id: "ACC123", + event: %E.ErrorInProcessManagerEvent{id: "ACC123", message: "some error"} + }) + end) + + assert_receive {:span, + span( + name: "commanded.process_manager.handle", + kind: :consumer, + status: {:status, :error, exception_message}, + attributes: attributes + )} + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{ + "commanded.event": + "Elixir.OpentelemetryCommanded.DummyApp.Events.ErrorInProcessManagerEvent" + }, + attributes + ) + + assert exception_message =~ "some error" + end + + test "Exception should create span with error set", context do + _log = + capture_log(fn -> + :ok = + app_dispatch(context, %C.DoEvent{ + id: "ACC123", + event: %E.ExceptionInProcessManagerEvent{id: "ACC123", message: "some error"} + }) + end) + + assert_receive {:span, + span( + name: "commanded.process_manager.handle", + kind: :consumer, + status: {:status, :error, exception_message}, + attributes: attributes, + events: events + )} + + attributes = :otel_attributes.map(attributes) + + has_basic_attributes!(attributes, context.correlation_id) + + assert match?( + %{ + "commanded.event": + "Elixir.OpentelemetryCommanded.DummyApp.Events.ExceptionInProcessManagerEvent" + }, + attributes + ) + + assert exception_message =~ "some error" + + [ + event( + name: "exception", + attributes: event_attributes + ) + ] = :otel_events.list(events) + + event_attributes = :otel_attributes.map(event_attributes) + + assert match?( + %{ + "exception.message" => "some error", + "exception.stacktrace" => _, + "exception.type" => "Elixir.RuntimeError" + }, + event_attributes + ) + + stack_trace = event_attributes["exception.stacktrace"] + assert stack_trace =~ "OpentelemetryCommanded.DummyApp.ProcessManager.handle/2" + end + end + + defp has_basic_attributes!(attributes, correlation_id) do + assert match?( + %{ + "commanded.application": OpentelemetryCommanded.DummyApp.App, + "commanded.causation_id": _, + "commanded.correlation_id": ^correlation_id, + "commanded.event": _, + "commanded.event_id": _, + "commanded.event_number": 1, + "commanded.handler_name": "ProcessManager", + "commanded.process_uuid": "ACC123", + "commanded.stream_id": "ACC123", + "commanded.stream_version": 1, + "messaging.conversation_id": ^correlation_id, + "messaging.destination": OpentelemetryCommanded.DummyApp.ProcessManager, + "messaging.destination_kind": "process_manager", + "messaging.message_id": _, + "messaging.operation": "receive", + "messaging.system": "commanded" + }, + attributes + ) + end +end diff --git a/instrumentation/opentelemetry_commanded/test/dummy_app/aggregate.ex b/instrumentation/opentelemetry_commanded/test/dummy_app/aggregate.ex new file mode 100644 index 00000000..d280550c --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/dummy_app/aggregate.ex @@ -0,0 +1,10 @@ +defmodule OpentelemetryCommanded.DummyApp.Aggregate do + @moduledoc false + + @derive Jason.Encoder + defstruct [:id, calls: 0] + + alias OpentelemetryCommanded.DummyApp.Aggregate + + def apply(%Aggregate{} = state, _event), do: %Aggregate{state | calls: state.calls + 1} +end diff --git a/instrumentation/opentelemetry_commanded/test/dummy_app/app.ex b/instrumentation/opentelemetry_commanded/test/dummy_app/app.ex new file mode 100644 index 00000000..31026f11 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/dummy_app/app.ex @@ -0,0 +1,16 @@ +defmodule OpentelemetryCommanded.DummyApp.App do + alias Commanded.EventStore.Adapters.InMemory + alias Commanded.Serialization.JsonSerializer + alias OpentelemetryCommanded.DummyApp.Router + + use Commanded.Application, + otp_app: :commanded, + event_store: [ + adapter: InMemory, + serializer: JsonSerializer + ], + pubsub: :local, + registry: :local + + router(Router) +end diff --git a/instrumentation/opentelemetry_commanded/test/dummy_app/command_validator_middleware.ex b/instrumentation/opentelemetry_commanded/test/dummy_app/command_validator_middleware.ex new file mode 100644 index 00000000..46ac8b8e --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/dummy_app/command_validator_middleware.ex @@ -0,0 +1,23 @@ +defmodule OpentelemetryCommanded.DummyApp.CommandValidatorMiddleware do + @behaviour Commanded.Middleware + + alias Commanded.Middleware.Pipeline + alias OpentelemetryCommanded.DummyApp.Commands, as: C + + def before_dispatch(%Pipeline{command: %C.DispatchError{} = command} = pipeline) do + Pipeline.assign(pipeline, :response, {:error, command.message}) + # Pipeline.halt(pipeline) + end + + def before_dispatch(%Pipeline{command: _command} = pipeline) do + pipeline + end + + def after_dispatch(%Pipeline{command: _command} = pipeline) do + pipeline + end + + def after_failure(%Pipeline{command: _command} = pipeline) do + pipeline + end +end diff --git a/instrumentation/opentelemetry_commanded/test/dummy_app/commands.ex b/instrumentation/opentelemetry_commanded/test/dummy_app/commands.ex new file mode 100644 index 00000000..9d4bfe90 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/dummy_app/commands.ex @@ -0,0 +1,27 @@ +defmodule OpentelemetryCommanded.DummyApp.Commands do + @moduledoc false + + defmodule Ok do + defstruct [:id] + end + + defmodule Error do + defstruct [:id, :message] + end + + defmodule RaiseException do + defstruct [:id, :message] + end + + defmodule DoEvent do + defstruct [:id, :event] + end + + defmodule DispatchError do + defstruct [:id, :message] + end + + defmodule ProcessManagerCommand do + defstruct [:id] + end +end diff --git a/instrumentation/opentelemetry_commanded/test/dummy_app/event_handler.ex b/instrumentation/opentelemetry_commanded/test/dummy_app/event_handler.ex new file mode 100644 index 00000000..3073bffa --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/dummy_app/event_handler.ex @@ -0,0 +1,17 @@ +defmodule OpentelemetryCommanded.DummyApp.EventHandler do + use Commanded.Event.Handler, + application: OpentelemetryCommanded.DummyApp.App, + name: "EventHandler", + start_from: :current, + consistency: :strong + + alias OpentelemetryCommanded.DummyApp.Events, as: E + + defstruct [:id] + + def handle(%E.OkEvent{}, _metadata), do: :ok + def handle(%E.ErrorInEventHandlerEvent{} = event, _metadata), do: {:error, event.message} + def handle(%E.ExceptionInEventHandlerEvent{} = event, _metadata), do: raise(event.message) + + def error(_error, _event, _ctx), do: :skip +end diff --git a/instrumentation/opentelemetry_commanded/test/dummy_app/events.ex b/instrumentation/opentelemetry_commanded/test/dummy_app/events.ex new file mode 100644 index 00000000..eb3b9f7b --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/dummy_app/events.ex @@ -0,0 +1,28 @@ +defmodule OpentelemetryCommanded.DummyApp.Events do + @moduledoc false + + defmodule OkEvent do + @derive Jason.Encoder + defstruct [:id] + end + + defmodule ErrorInEventHandlerEvent do + @derive Jason.Encoder + defstruct [:id, :message] + end + + defmodule ExceptionInEventHandlerEvent do + @derive Jason.Encoder + defstruct [:id, :message] + end + + defmodule ErrorInProcessManagerEvent do + @derive Jason.Encoder + defstruct [:id, :message] + end + + defmodule ExceptionInProcessManagerEvent do + @derive Jason.Encoder + defstruct [:id, :message] + end +end diff --git a/instrumentation/opentelemetry_commanded/test/dummy_app/handler.ex b/instrumentation/opentelemetry_commanded/test/dummy_app/handler.ex new file mode 100644 index 00000000..4abe8ff9 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/dummy_app/handler.ex @@ -0,0 +1,13 @@ +defmodule OpentelemetryCommanded.DummyApp.Handler do + @behaviour Commanded.Commands.Handler + + alias OpentelemetryCommanded.DummyApp.Aggregate + alias OpentelemetryCommanded.DummyApp.Commands, as: C + alias OpentelemetryCommanded.DummyApp.Events, as: E + + def handle(%Aggregate{}, %C.Ok{id: id}), do: %E.OkEvent{id: id} + def handle(%Aggregate{}, %C.Error{message: message}), do: {:error, message} + def handle(%Aggregate{}, %C.RaiseException{message: "some error"}), do: raise("some error") + def handle(%Aggregate{}, %C.DoEvent{event: event}), do: event + def handle(%Aggregate{}, %C.ProcessManagerCommand{}), do: nil +end diff --git a/instrumentation/opentelemetry_commanded/test/dummy_app/process_manager.ex b/instrumentation/opentelemetry_commanded/test/dummy_app/process_manager.ex new file mode 100644 index 00000000..6c4b6601 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/dummy_app/process_manager.ex @@ -0,0 +1,36 @@ +defmodule OpentelemetryCommanded.DummyApp.ProcessManager do + use Commanded.ProcessManagers.ProcessManager, + application: OpentelemetryCommanded.DummyApp.App, + name: "ProcessManager" + + @derive Jason.Encoder + defstruct [:id] + + alias OpentelemetryCommanded.DummyApp.Commands, as: C + alias OpentelemetryCommanded.DummyApp.Events, as: E + + def interested?(%mod{} = event) + when mod in [E.OkEvent, E.ErrorInProcessManagerEvent, E.ExceptionInProcessManagerEvent] do + {:start, event.id} + end + + def interested?(_event) do + false + end + + def handle(_pm, %E.OkEvent{id: id}) do + %C.ProcessManagerCommand{id: id} + end + + def handle(_pm, %E.ErrorInProcessManagerEvent{} = event) do + {:error, event.message} + end + + def handle(_pm, %E.ExceptionInProcessManagerEvent{} = event) do + raise event.message + end + + def error(_error, _command_or_event, _failure_context) do + :skip + end +end diff --git a/instrumentation/opentelemetry_commanded/test/dummy_app/router.ex b/instrumentation/opentelemetry_commanded/test/dummy_app/router.ex new file mode 100644 index 00000000..149d4590 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/dummy_app/router.ex @@ -0,0 +1,19 @@ +defmodule OpentelemetryCommanded.DummyApp.Router do + @moduledoc false + + use Commanded.Commands.Router + + alias OpentelemetryCommanded.DummyApp.Aggregate + alias OpentelemetryCommanded.DummyApp.Commands, as: C + alias OpentelemetryCommanded.DummyApp.Handler + + middleware(OpentelemetryCommanded.Middleware) + middleware(OpentelemetryCommanded.DummyApp.CommandValidatorMiddleware) + + identify(Aggregate, by: :id) + + dispatch([C.Ok, C.Error, C.RaiseException, C.DoEvent, C.DispatchError, C.ProcessManagerCommand], + to: Handler, + aggregate: Aggregate + ) +end diff --git a/instrumentation/opentelemetry_commanded/test/opentelemetry_commanded_test.exs b/instrumentation/opentelemetry_commanded/test/opentelemetry_commanded_test.exs new file mode 100644 index 00000000..2562fa19 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/opentelemetry_commanded_test.exs @@ -0,0 +1,14 @@ +defmodule OpentelemetryCommandedTest do + use ExUnit.Case, async: false + doctest OpentelemetryCommanded + + test "sets it up!" do + running? = + case OpentelemetryCommanded.setup() do + :ok -> true + {:error, :already_exists} -> true + end + + assert running? + end +end diff --git a/instrumentation/opentelemetry_commanded/test/support/commanded_case.ex b/instrumentation/opentelemetry_commanded/test/support/commanded_case.ex new file mode 100644 index 00000000..4930e31d --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/support/commanded_case.ex @@ -0,0 +1,126 @@ +defmodule OpentelemetryCommanded.CommandedCase do + @moduledoc """ + A case template for tests relying on the CommandedApp + """ + + use ExUnit.CaseTemplate + + using do + quote do + import OpentelemetryCommanded.CommandedCase + end + end + + alias Commanded.Helpers.CommandAuditMiddleware + alias OpentelemetryCommanded.DummyApp.App + + require Record + + for {name, spec} <- Record.extract_all(from_lib: "opentelemetry/include/otel_span.hrl") do + Record.defrecord(name, spec) + end + + setup do + start_supervised!(CommandAuditMiddleware) + start_supervised!(App) + {:ok, _handler} = OpentelemetryCommanded.DummyApp.EventHandler.start_link() + {:ok, _pid} = OpentelemetryCommanded.DummyApp.ProcessManager.start_link(start_from: :current) + + :application.stop(:opentelemetry) + :application.set_env(:opentelemetry, :tracer, :otel_tracer_default) + + :application.set_env(:opentelemetry, :processors, [ + {:otel_batch_processor, %{scheduled_delay_ms: 1}} + ]) + + :application.start(:opentelemetry) + + :otel_batch_processor.set_exporter(:otel_exporter_pid, self()) + + %{correlation_id: "b802ced4-02de-4f12-943e-42cef58658ed"} + end + + @tracer_id __MODULE__ + + setup do + :telemetry.attach( + {__MODULE__, :start}, + [:opentelemetry_commanded, :test, :start], + &__MODULE__.handle_start/4, + [] + ) + + :telemetry.attach( + {__MODULE__, :stop}, + [:opentelemetry_commanded, :test, :stop], + &__MODULE__.handle_stop/4, + [] + ) + + :telemetry.attach( + {__MODULE__, :exception}, + [:opentelemetry_commanded, :test, :exception], + &__MODULE__.handle_exception/4, + [] + ) + + :ok + end + + def app_dispatch(context, command) do + meta = %{} + + # Add containing span to prove context is properly passed across process boundaries + :telemetry.span([:opentelemetry_commanded, :test], meta, fn -> + { + App.dispatch(command, + application: OpentelemetryCommanded.DummyApp.App, + correlation_id: context.correlation_id, + consistency: :strong + ), + meta + } + end) + end + + def handle_start(_event, _, meta, _) do + attributes = %{ + "test_span.id": inspect(meta.telemetry_span_context, structs: false) + } + + OpentelemetryTelemetry.start_telemetry_span( + @tracer_id, + "opentelemetry_commanded.test", + meta, + %{ + kind: :internal, + attributes: attributes + } + ) + end + + def handle_stop(_event, _measurements, meta, _) do + # ensure the correct span is current + OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta) + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta) + end + + def handle_exception( + _event, + _measurements, + %{kind: kind, reason: reason, stacktrace: stacktrace} = meta, + _config + ) do + ctx = OpentelemetryTelemetry.set_current_telemetry_span(@tracer_id, meta) + + # try to normalize all errors to Elixir exceptions + exception = Exception.normalize(kind, reason, stacktrace) + + # record exception and mark the span as errored + OpenTelemetry.Span.record_exception(ctx, exception, stacktrace) + OpenTelemetry.Span.set_status(ctx, OpenTelemetry.status(:error, inspect(reason))) + + OpentelemetryTelemetry.end_telemetry_span(@tracer_id, meta) + end +end diff --git a/instrumentation/opentelemetry_commanded/test/test_helper.exs b/instrumentation/opentelemetry_commanded/test/test_helper.exs new file mode 100644 index 00000000..869559e7 --- /dev/null +++ b/instrumentation/opentelemetry_commanded/test/test_helper.exs @@ -0,0 +1 @@ +ExUnit.start()