Skip to content

Commit

Permalink
Support for Elastic.Clients.Elasticsearch (#1935)
Browse files Browse the repository at this point in the history
* Test `Elastic.Clients.Elasticsearch` via OTel

* Suppress HTTP span creation for elasticsearch via OTel

* Cleanup

* Update README.md

* Update HttpDiagnosticListenerImplBase.cs

* Update IHttpSpanTracer.cs

* Update ElasticSearchHttpNonTracer.cs

* Update ElasticSearchHttpNonTracer.cs

* Update ElasticSearchTests.cs

* Apply suggestions from code review

Co-authored-by: Wolfgang Ziegler <[email protected]>

* Fix typo: ElasticSearch -> Elasticsearch

* Change method name on IHttpSpanTracer

`SuppressSpanCreation` -> `ShouldSuppressSpanCreation`

* Renaming leftovers

Rider "renaming" feature left me down.

* Update ElasticsearchTestFixture.cs

* Update ElasticsearchTestFixture.cs

Co-authored-by: Wolfgang Ziegler <[email protected]>
  • Loading branch information
gregkalapos and z1c0 authored Dec 7, 2022
1 parent c77bb77 commit a81ac60
Show file tree
Hide file tree
Showing 14 changed files with 416 additions and 21 deletions.
7 changes: 7 additions & 0 deletions ElasticApmAgent.sln
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Apm.Azure.Functions
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.AzureFunctionApp.Isolated", "sample\Elastic.AzureFunctionApp.Isolated\Elastic.AzureFunctionApp.Isolated.csproj", "{9879E646-63ED-474C-905B-1B469403F0A5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Elastic.Clients.Elasticsearch.Tests", "test\Elastic.Clients.Elasticsearch.Tests\Elastic.Clients.Elasticsearch.Tests.csproj", "{E6217549-8C21-4E95-BDF2-E782922CD104}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -495,6 +497,10 @@ Global
{9879E646-63ED-474C-905B-1B469403F0A5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{9879E646-63ED-474C-905B-1B469403F0A5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{9879E646-63ED-474C-905B-1B469403F0A5}.Release|Any CPU.Build.0 = Release|Any CPU
{E6217549-8C21-4E95-BDF2-E782922CD104}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{E6217549-8C21-4E95-BDF2-E782922CD104}.Debug|Any CPU.Build.0 = Debug|Any CPU
{E6217549-8C21-4E95-BDF2-E782922CD104}.Release|Any CPU.ActiveCfg = Release|Any CPU
{E6217549-8C21-4E95-BDF2-E782922CD104}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -575,6 +581,7 @@ Global
{A124FE13-1C70-4D59-8577-9BD4204923CA} = {3734A52F-2222-454B-BF58-1BA5C1F29D77}
{8E3D94CD-3378-4431-8956-07644A64D4E8} = {267A241E-571F-458F-B04C-B6C4DE79E735}
{9879E646-63ED-474C-905B-1B469403F0A5} = {3C791D9C-6F19-4F46-B367-2EC0F818762D}
{E6217549-8C21-4E95-BDF2-E782922CD104} = {267A241E-571F-458F-B04C-B6C4DE79E735}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {69E02FD9-C9DE-412C-AB6B-5B8BECC6BFA5}
Expand Down
2 changes: 2 additions & 0 deletions src/Elastic.Apm.Azure.CosmosDb/AzureCosmosDbTracer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,8 @@ public ISpan StartSpan(IApmAgent agent, string method, Uri requestUrl, Func<stri
return span;
}

public bool ShouldSuppressSpanCreation() => false;

private static string GetOperationName(string operation) =>
OperationNames.TryGetValue(operation, out var operationName) ? operationName : operation;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
// See the LICENSE file in the project root for more information

using System;
using System.Text.RegularExpressions;
using Elastic.Apm.Api;
using Elastic.Apm.DiagnosticListeners;
using Elastic.Apm.Model;
Expand Down Expand Up @@ -126,5 +125,7 @@ public ISpan StartSpan(IApmAgent agent, string method, Uri requestUrl, Func<stri

return span;
}

public bool ShouldSuppressSpanCreation() => false;
}
}
4 changes: 2 additions & 2 deletions src/Elastic.Apm/AgentComponents.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ internal AgentComponents(
ConfigurationStore = new ConfigurationStore(new ConfigurationSnapshotFromReader(ConfigurationReader, "local"), Logger);

ApmServerInfo = apmServerInfo ?? new ApmServerInfo();
HttpTraceConfiguration = new HttpTraceConfiguration();

// Called by PayloadSenderV2 after the ServerInfo is fetched
Action<bool, IApmServerInfo> serverInfoCallback = null;
Expand All @@ -65,7 +66,7 @@ internal AgentComponents(
ElasticActivityListener activityListener = null;
if (ConfigurationReader.EnableOpenTelemetryBridge)
{
activityListener = new ElasticActivityListener(this);
activityListener = new ElasticActivityListener(this, HttpTraceConfiguration);

serverInfoCallback = (success, serverInfo) =>
{
Expand Down Expand Up @@ -103,7 +104,6 @@ internal AgentComponents(
if (ConfigurationReader.Enabled)
breakdownMetricsProvider ??= new BreakdownMetricsProvider(Logger);

HttpTraceConfiguration = new HttpTraceConfiguration();
SubscribedListeners = new HashSet<Type>();

// initialize the tracer before central configuration or metric collection is started
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,17 @@ private void ProcessStartEvent(TRequest request, Uri requestUrl)

var method = RequestGetMethod(request);
ISpan span = null;
var suppressSpanCreation = false;
if (_configuration?.HasTracers ?? false)
{
using (var httpTracers = _configuration.GetTracers())
{
foreach (var httpSpanTracer in httpTracers)
{
suppressSpanCreation = httpSpanTracer.ShouldSuppressSpanCreation();
if (suppressSpanCreation)
break;

if (httpSpanTracer.IsMatch(method, requestUrl,
header => RequestTryGetHeader(request, header, out var value) ? value : null))
{
Expand All @@ -165,6 +170,14 @@ private void ProcessStartEvent(TRequest request, Uri requestUrl)
{
if (_configuration?.CaptureSpan ?? false)
{
if (suppressSpanCreation)
{
Logger.Trace()
?.Log("Skip creating span for outgoing HTTP request to {RequestUrl} as it was suppressed by an HttpSpanTracer",
requestUrl.Sanitize());
return;
}

span = ExecutionSegmentCommon.StartSpanOnCurrentExecutionSegment(ApmAgent, $"{method} {requestUrl.Host}",
ApiConstants.TypeExternal, ApiConstants.SubtypeHttp, InstrumentationFlag.HttpClient, true, true);

Expand Down
15 changes: 15 additions & 0 deletions src/Elastic.Apm/DiagnosticListeners/IHttpSpanTracer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@

namespace Elastic.Apm.DiagnosticListeners
{
/// <summary>
/// Utility interface for the generic HTTP request capturing mechanism.
/// For specific HTTP calls, it can handle the creation of the span and it
/// can also completely suppress span creation.
/// </summary>
internal interface IHttpSpanTracer
{
/// <summary>
Expand All @@ -29,5 +34,15 @@ internal interface IHttpSpanTracer
/// <param name="headerGetter">A delegate to retrieve a HTTP header</param>
/// <returns>A new instance of a <see cref="ISpan"/>. Can return null</returns>
ISpan StartSpan(IApmAgent agent, string method, Uri requestUrl, Func<string, string> headerGetter);

/// <summary>
/// Determines if a span should be captured for the given HTTP all.
/// </summary>
/// <returns>
/// <c>true</c> if a span should be captured for a given call (either created in the <see cref="StartSpan" /> or
/// if it fails then by the generic HTTP listener). <c>false</c> if span creation should be suppressed for the given HTTP
/// call
/// </returns>
bool ShouldSuppressSpanCreation();
}
}
15 changes: 8 additions & 7 deletions src/Elastic.Apm/Elastic.Apm.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<InternalsVisibleTo Include="Elastic.Apm.Profiler.Managed" Key="$(ExposedPublicKey)" />
<InternalsVisibleTo Include="Elastic.Apm.Profiler.Managed.Tests" Key="$(ExposedPublicKey)" />
<InternalsVisibleTo Include="Elastic.Apm.StaticExplicitInitialization.Tests" Key="$(ExposedPublicKey)" />
<InternalsVisibleTo Include="Elastic.Clients.Elasticsearch.Tests" Key="$(ExposedPublicKey)" />
</ItemGroup>

<ItemGroup Condition="'$(DiagnosticSourceVersion)' == ''">
Expand All @@ -66,20 +67,20 @@
compile multiple versions of the agent.
-->
<ItemGroup Condition="'$(DiagnosticSourceVersion)' != ''">
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="$(DiagnosticSourceVersion)"/>
<PackageReference Include="System.Diagnostics.DiagnosticSource" Version="$(DiagnosticSourceVersion)" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0"/>
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" />
</ItemGroup>
<ItemGroup Condition="'$(TargetFramework)' == 'net5.0' or '$(TargetFramework)' == 'net6.0'">
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0"/>
<PackageReference Include="System.Diagnostics.PerformanceCounter" Version="4.5.0" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Diagnostics.Tracing.TraceEvent" Version="2.0.2"/>
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0"/>
<PackageReference Include="Microsoft.Diagnostics.Tracing.TraceEvent" Version="2.0.2" />
<PackageReference Include="System.Threading.Tasks.Dataflow" Version="4.9.0" />
<!-- Used by Ben.Demystifier -->
<PackageReference Include="System.Reflection.Metadata" Version="5.0.0"/>
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4"/>
<PackageReference Include="System.Reflection.Metadata" Version="5.0.0" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.4" />
</ItemGroup>

<!-- Newtonsoft.Json constants -->
Expand Down
16 changes: 5 additions & 11 deletions src/Elastic.Apm/OpenTelemetry/ElasticActivityListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ public class ElasticActivityListener : IDisposable
private readonly ConcurrentDictionary<string, Span> ActiveSpans = new();
private readonly ConcurrentDictionary<string, Transaction> ActiveTransactions = new();

internal ElasticActivityListener(IApmAgent agent) => _logger = agent.Logger?.Scoped(nameof(ElasticActivityListener));
internal ElasticActivityListener(IApmAgent agent, HttpTraceConfiguration httpTraceConfiguration) => (_logger, _httpTraceConfiguration) =
(agent.Logger?.Scoped(nameof(ElasticActivityListener)), httpTraceConfiguration);

private readonly IApmLogger _logger;
private Tracer _tracer;
private readonly HttpTraceConfiguration _httpTraceConfiguration;

internal void Start(Tracer tracerInternal)
{
_httpTraceConfiguration?.AddTracer(new ElasticSearchHttpNonTracer());

_tracer = tracerInternal;
Listener = new ActivityListener
{
Expand Down Expand Up @@ -154,8 +158,6 @@ internal void Start(Tracer tracerInternal)
case ActivityStatusCode.Error:
transaction.Outcome = Outcome.Failure;
break;
default:
break;
}
#endif

Expand Down Expand Up @@ -187,8 +189,6 @@ internal void Start(Tracer tracerInternal)
case ActivityStatusCode.Error:
span.Outcome = Outcome.Failure;
break;
default:
break;
}
#endif
span.End();
Expand All @@ -203,17 +203,11 @@ private void InferTransactionType(Transaction transaction, Activity activity)
var isMessaging = activity.Tags.Any(n => n.Key == "messaging.system");

if (activity.Kind == ActivityKind.Server && (isRpc || isHttp))
{
transaction.Type = ApiConstants.TypeRequest;
}
else if (activity.Kind == ActivityKind.Consumer && isMessaging)
{
transaction.Type = ApiConstants.TypeMessaging;
}
else
{
transaction.Type = "unknown";
}
}

private ActivityListener Listener { get; set; }
Expand Down
34 changes: 34 additions & 0 deletions src/Elastic.Apm/OpenTelemetry/ElasticSearchHttpNonTracer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Licensed to Elasticsearch B.V under
// one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
#if NET5_0_OR_GREATER

using System;
using System.Diagnostics;
using System.Linq;
using Elastic.Apm.Api;
using Elastic.Apm.DiagnosticListeners;

namespace Elastic.Apm.OpenTelemetry
{
/// <summary>
/// Handles HTTP spans for outgoing HTTP calls from `Elastic.Clients.ElasticSearch`.
/// Since `Elastic.Clients.ElasticSearch` emits <see cref="Activity"/>, and according to our spec, for calls into elasticsearch
/// we don't need to create an extra HTTP span, all this does is that it suppresses span creation during the outgoing HTTP call.
/// </summary>
public class ElasticSearchHttpNonTracer : IHttpSpanTracer
{
public bool IsMatch(string method, Uri requestUrl, Func<string, string> headerGetter) => false;
public ISpan StartSpan(IApmAgent agent, string method, Uri requestUrl, Func<string, string> headerGetter) => null;

public bool ShouldSuppressSpanCreation()
{
if (Activity.Current == null || Activity.Current.Parent == null)
return false;
return Activity.Current.Parent.DisplayName.StartsWith("Elasticsearch:") && Activity.Current.Parent.Tags.Any(n => n is { Key: "db.system", Value: "elasticsearch" });
}
}
}

#endif
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<InternalsVisibleTo Include="Elastic.Apm.Azure.CosmosDb.Tests" Key="$(ExposedPublicKey)" />
<InternalsVisibleTo Include="Elastic.Apm.Azure.Storage.Tests" Key="$(ExposedPublicKey)" />
<InternalsVisibleTo Include="Elastic.Apm.MongoDb.Tests" Key="$(ExposedPublicKey)" />
<InternalsVisibleTo Include="Elastic.Clients.Elasticsearch.Tests" Key="$(ExposedPublicKey)" />
</ItemGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="FluentAssertions" Version="5.6.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3" />
<PackageReference Include="Elastic.Clients.Elasticsearch" Version="8.0.1" />
<PackageReference Include="Testcontainers" Version="2.2.0" />
<DotNetCliToolReference Include="dotnet-xunit" Version="2.3.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.4.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\src\Elastic.Apm\Elastic.Apm.csproj" />
<ProjectReference Include="..\Elastic.Apm.Tests.Utilities\Elastic.Apm.Tests.Utilities.csproj" />
</ItemGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to Elasticsearch B.V under
// one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information

using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Configurations;
using DotNet.Testcontainers.Containers;
using Elastic.Transport;
using Xunit;

namespace Elastic.Clients.Elasticsearch.Tests;

public class ElasticsearchTestFixture : IAsyncDisposable, IAsyncLifetime
{
public ElasticsearchTestcontainer Container { get; }
public ElasticsearchClient? Client { get; private set; }

private readonly TestcontainerDatabaseConfiguration _configuration = new ElasticsearchTestcontainerConfiguration { Password = "secret" };

public ElasticsearchTestFixture() =>
Container = new TestcontainersBuilder<ElasticsearchTestcontainer>()
.WithImage("docker.elastic.co/elasticsearch/elasticsearch:8.5")
.WithDatabase(_configuration)
.Build();

public async Task InitializeAsync()
{
await Container.StartAsync();

var settings = new ElasticsearchClientSettings(new Uri(Container.ConnectionString))
.ServerCertificateValidationCallback(CertificateValidations.AllowAll)
.Authentication(new BasicAuthentication(Container.Username, Container.Password));

Client = new ElasticsearchClient(settings);
if (Client == null)
throw new Exception("`new ElasticsearchClient(settings)` returned `null`");
}

async Task IAsyncLifetime.DisposeAsync()
{
if (Container.State == TestcontainersStates.Running)
{
await Container.StopAsync();
await Container.DisposeAsync();
}
}

async ValueTask IAsyncDisposable.DisposeAsync()
{
if (Container.State == TestcontainersStates.Running)
{
await Container.StopAsync();
await Container.DisposeAsync();
}
}
}
Loading

0 comments on commit a81ac60

Please sign in to comment.