Skip to content

Commit

Permalink
Write directly to output stream in OtlpHttpTraceExportClient. (#2490)
Browse files Browse the repository at this point in the history
  • Loading branch information
CodeBlanch authored Oct 16, 2021
1 parent f8f2018 commit d119447
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,14 @@

using System;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
#if NET5_0_OR_GREATER
using System.Threading;
#endif
using System.Threading.Tasks;
using Google.Protobuf;
using OtlpCollector = Opentelemetry.Proto.Collector.Trace.V1;

Expand All @@ -43,18 +49,48 @@ protected override HttpRequestMessage CreateHttpRequest(OtlpCollector.ExportTrac
request.Headers.Add(header.Key, header.Value);
}

var content = Array.Empty<byte>();
using (var stream = new MemoryStream())
request.Content = new ExportRequestContent(exportRequest);

return request;
}

internal sealed class ExportRequestContent : HttpContent
{
private static readonly MediaTypeHeaderValue ProtobufMediaTypeHeader = new MediaTypeHeaderValue(MediaContentType);

private readonly OtlpCollector.ExportTraceServiceRequest exportRequest;

public ExportRequestContent(OtlpCollector.ExportTraceServiceRequest exportRequest)
{
this.exportRequest = exportRequest;
this.Headers.ContentType = ProtobufMediaTypeHeader;
}

#if NET5_0_OR_GREATER
protected override void SerializeToStream(Stream stream, TransportContext context, CancellationToken cancellationToken)
{
exportRequest.WriteTo(stream);
content = stream.ToArray();
this.SerializeToStreamInternal(stream);
}
#endif

var binaryContent = new ByteArrayContent(content);
binaryContent.Headers.ContentType = new MediaTypeHeaderValue(MediaContentType);
request.Content = binaryContent;
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
this.SerializeToStreamInternal(stream);
return Task.CompletedTask;
}

return request;
protected override bool TryComputeLength(out long length)
{
// We can't know the length of the content being pushed to the output stream.
length = -1;
return false;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void SerializeToStreamInternal(Stream stream)
{
this.exportRequest.WriteTo(stream);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// <copyright file="OtlpExporterBenchmarks.cs" company="OpenTelemetry Authors">
// <copyright file="OtlpGrpcExporterBenchmarks.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand Down Expand Up @@ -29,7 +29,7 @@
namespace Benchmarks.Exporter
{
[MemoryDiagnoser]
public class OtlpExporterBenchmarks
public class OtlpGrpcExporterBenchmarks
{
private OtlpTraceExporter exporter;
private Activity activity;
Expand Down
104 changes: 104 additions & 0 deletions test/Benchmarks/Exporter/OtlpHttpExporterBenchmarks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// <copyright file="OtlpHttpExporterBenchmarks.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Diagnostics;
using System.IO;
using BenchmarkDotNet.Attributes;
using Benchmarks.Helper;
using OpenTelemetry;
using OpenTelemetry.Exporter;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Internal;
using OpenTelemetry.Tests;

namespace Benchmarks.Exporter
{
[MemoryDiagnoser]
public class OtlpHttpExporterBenchmarks
{
private readonly byte[] buffer = new byte[1024 * 1024];
private IDisposable server;
private string serverHost;
private int serverPort;
private OtlpTraceExporter exporter;
private Activity activity;
private CircularBuffer<Activity> activityBatch;

[Params(1, 10, 100)]
public int NumberOfBatches { get; set; }

[Params(10000)]
public int NumberOfSpans { get; set; }

[GlobalSetup]
public void GlobalSetup()
{
this.server = TestHttpServer.RunServer(
(ctx) =>
{
using (Stream receiveStream = ctx.Request.InputStream)
{
while (true)
{
if (receiveStream.Read(this.buffer, 0, this.buffer.Length) == 0)
{
break;
}
}
}

ctx.Response.StatusCode = 200;
ctx.Response.OutputStream.Close();
},
out this.serverHost,
out this.serverPort);

var options = new OtlpExporterOptions
{
Endpoint = new Uri($"http://{this.serverHost}:{this.serverPort}"),
};
this.exporter = new OtlpTraceExporter(
options,
new OtlpHttpTraceExportClient(options));

this.activity = ActivityHelper.CreateTestActivity();
this.activityBatch = new CircularBuffer<Activity>(this.NumberOfSpans);
}

[GlobalCleanup]
public void GlobalCleanup()
{
this.exporter.Shutdown();
this.exporter.Dispose();
this.server.Dispose();
}

[Benchmark]
public void OtlpExporter_Batching()
{
for (int i = 0; i < this.NumberOfBatches; i++)
{
for (int c = 0; c < this.NumberOfSpans; c++)
{
this.activityBatch.Add(this.activity);
}

this.exporter.Export(new Batch<Activity>(this.activityBatch, this.NumberOfSpans));
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ void RunTest(Batch<Activity> batch)
Assert.Contains(httpRequest.Headers, h => h.Key == header2.Name && h.Value.First() == header2.Value);

Assert.NotNull(httpRequest.Content);
Assert.IsType<ByteArrayContent>(httpRequest.Content);
Assert.IsType<OtlpHttpTraceExportClient.ExportRequestContent>(httpRequest.Content);
Assert.Contains(httpRequest.Content.Headers, h => h.Key == "Content-Type" && h.Value.First() == OtlpHttpTraceExportClient.MediaContentType);

var exportTraceRequest = OtlpCollector.ExportTraceServiceRequest.Parser.ParseFrom(httpRequestContent);
Expand Down

0 comments on commit d119447

Please sign in to comment.