-
Notifications
You must be signed in to change notification settings - Fork 124
/
opentelemetry_ecto.ex
243 lines (194 loc) · 7.45 KB
/
opentelemetry_ecto.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
defmodule OpentelemetryEcto do
@moduledoc """
Telemetry handler for creating OpenTelemetry Spans from Ecto query events.
Any relation preloads, which are executed in parallel in separate tasks, will be
linked to the span of the process that initiated the call. For example:
Tracer.with_span "parent span" do
Repo.all(Query.from(User, preload: [:posts, :comments]))
end
This will create a span called `"parent span:"` with three child spans for each
query: users, posts, and comments.
> #### Note {: .neutral}
>
> Due to limitations with how Ecto emits its telemetry, nested preloads are not
> represented as nested spans within a trace.
"""
require OpenTelemetry.Tracer
@typedoc """
Option that you can pass to `setup/2`.
"""
@typedoc since: "1.3.0"
@type setup_option() ::
{:time_unit, System.time_unit()}
| {:span_prefix, String.t()}
| {:additional_attributes, %{String.t() => term()}}
| {:db_statement, :enabled | :disabled | (String.t() -> String.t())}
@doc """
Attaches the `OpentelemetryEcto` handler to your repo events.
This should be called from your application's `c:Application.start/2` callback on startup,
before starting the application's top-level supervisor.
`event_prefix` must be the prefix configured in the `Ecto.Repo` Telemetry configuration.
By default, it's the camel_case name of the repository module. For `MyApp.Repo`, it would
be `[:my_app, :repo]`.
For example:
@impl Application
def start(_type, _args) do
OpentelemetryEcto.setup([:blog, :repo])
children = [...]
Supervisor.start_link(children, strategy: :one_for_one)
end
## Options
You may also supply the following options in the second argument:
* `:time_unit` - a time unit used to convert the values of query phase
timings, defaults to `:microsecond`. See `System.convert_time_unit/3`.
* `:span_prefix` - the first part of the span name.
Defaults to the concatenation of the event name with periods, such as
`"blog.repo.query"`. This will always be followed with a colon and the
source (the table name for SQL adapters). For example: `"blog.repo.query:users"`.
* `:additional_attributes` - additional attributes to include in the span. If there
are conflicts with default provided attributes, the ones provided with
this config will have precedence.
* `:db_statement` - `:disabled` (default), `:enabled`, or a function.
Whether or not to include DB statements in the **span attributes** (as the
`db.statement` attribute).
Optionally provide a function that takes a query string and returns a
sanitized version of it. This is useful for removing sensitive information from the
query string. Unless this option is `:enabled` or a function,
query statements will not be recorded on spans.
"""
@spec setup(:telemetry.event_name(), [setup_option()]) :: :ok | {:error, :already_exists}
def setup(event_prefix, options \\ []) when is_list(options) do
event = event_prefix ++ [:query]
:telemetry.attach({__MODULE__, event}, event, &__MODULE__.handle_event/4, options)
end
@doc false
def handle_event(
event,
measurements,
%{query: query, source: source, result: query_result, repo: repo, type: type},
config
) do
# Doing all this even if the span isn't sampled so the sampler
# could technically use the attributes to decide if it should sample or not
total_time = measurements.total_time
end_time = :opentelemetry.timestamp()
start_time = end_time - total_time
database = repo.config()[:database]
url =
case repo.config()[:url] do
nil ->
# TODO: add port
URI.to_string(%URI{scheme: "ecto", host: repo.config()[:hostname]})
url ->
url
end
span_prefix =
case Keyword.fetch(config, :span_prefix) do
{:ok, prefix} -> prefix
:error -> Enum.join(event, ".")
end
span_suffix = if source != nil, do: ":#{source}", else: ""
span_name = span_prefix <> span_suffix
time_unit = Keyword.get(config, :time_unit, :microsecond)
additional_attributes = Keyword.get(config, :additional_attributes, %{})
db_type =
case type do
:ecto_sql_query -> :sql
_ -> type
end
# TODO: need connection information to complete the required attributes
# net.peer.name or net.peer.ip and net.peer.port
base_attributes = %{
"db.type": db_type,
source: source,
"db.instance": database,
"db.name": database,
"db.url": url,
"total_time_#{time_unit}s": System.convert_time_unit(total_time, :native, time_unit)
}
db_statement_config = Keyword.get(config, :db_statement, :disabled)
attributes =
base_attributes
|> add_measurements(measurements, time_unit)
|> maybe_add_db_statement(db_statement_config, query)
|> maybe_add_db_system(repo.__adapter__())
|> add_additional_attributes(additional_attributes)
parent_context =
case OpentelemetryProcessPropagator.fetch_ctx(self()) do
:undefined ->
OpentelemetryProcessPropagator.fetch_parent_ctx(1, :"$callers")
ctx ->
ctx
end
parent_token =
if parent_context != :undefined do
OpenTelemetry.Ctx.attach(parent_context)
else
:undefined
end
s =
OpenTelemetry.Tracer.start_span(span_name, %{
start_time: start_time,
attributes: attributes,
kind: :client
})
case query_result do
{:error, error} ->
OpenTelemetry.Span.set_status(s, OpenTelemetry.status(:error, format_error(error)))
{:ok, _} ->
:ok
end
OpenTelemetry.Span.end_span(s)
if parent_token != :undefined do
OpenTelemetry.Ctx.detach(parent_token)
end
end
defp format_error(%{__exception__: true} = exception) do
Exception.message(exception)
end
defp format_error(_), do: ""
defp add_measurements(attributes, measurements, time_unit) do
measurements
|> Enum.reduce(attributes, fn
{k, v}, acc
when not is_nil(v) and k in [:decode_time, :query_time, :queue_time, :idle_time] ->
Map.put(
acc,
String.to_atom("#{k}_#{time_unit}s"),
System.convert_time_unit(v, :native, time_unit)
)
_, acc ->
acc
end)
end
defp maybe_add_db_statement(attributes, :enabled, query) do
Map.put(attributes, :"db.statement", query)
end
defp maybe_add_db_statement(attributes, :disabled, _query) do
attributes
end
defp maybe_add_db_statement(attributes, sanitizer, query) when is_function(sanitizer, 1) do
Map.put(attributes, :"db.statement", sanitizer.(query))
end
defp maybe_add_db_statement(attributes, _, _query) do
attributes
end
defp maybe_add_db_system(attributes, Ecto.Adapters.Postgres) do
Map.put(attributes, :"db.system", :postgresql)
end
defp maybe_add_db_system(attributes, Ecto.Adapters.MyXQL) do
Map.put(attributes, :"db.system", :mysql)
end
defp maybe_add_db_system(attributes, Ecto.Adapters.SQLite3) do
Map.put(attributes, :"db.system", :sqlite)
end
defp maybe_add_db_system(attributes, Ecto.Adapters.Tds) do
Map.put(attributes, :"db.system", :mssql)
end
defp maybe_add_db_system(attributes, _) do
attributes
end
defp add_additional_attributes(attributes, additional_attributes) do
Map.merge(attributes, additional_attributes)
end
end