Skip to content

Commit

Permalink
[Exporter.Geneva] Refactor GenevaExporter (#649)
Browse files Browse the repository at this point in the history
  • Loading branch information
utpilla authored Sep 27, 2022
1 parent 20338a9 commit 7f8df02
Show file tree
Hide file tree
Showing 10 changed files with 260 additions and 129 deletions.
73 changes: 73 additions & 0 deletions src/OpenTelemetry.Exporter.Geneva/GenevaLogExporter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// <copyright file="GenevaLogExporter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using OpenTelemetry.Internal;
using OpenTelemetry.Logs;

namespace OpenTelemetry.Exporter.Geneva;

public class GenevaLogExporter : GenevaBaseExporter<LogRecord>
{
internal bool IsUsingUnixDomainSocket;

private bool isDisposed;

private delegate ExportResult ExportLogRecordFunc(in Batch<LogRecord> batch);

private readonly ExportLogRecordFunc exportLogRecord;

private readonly IDisposable exporter;

public GenevaLogExporter(GenevaExporterOptions options)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNullOrWhitespace(options.ConnectionString);

var msgPackExporter = new MsgPackLogExporter(options);
this.IsUsingUnixDomainSocket = msgPackExporter.IsUsingUnixDomainSocket;
this.exportLogRecord = (in Batch<LogRecord> batch) => msgPackExporter.Export(in batch);
this.exporter = msgPackExporter;
}

public override ExportResult Export(in Batch<LogRecord> batch)
{
return this.exportLogRecord(batch);
}

protected override void Dispose(bool disposing)
{
if (this.isDisposed)
{
return;
}

if (disposing)
{
try
{
this.exporter.Dispose();
}
catch (Exception ex)
{
ExporterEventSource.Log.ExporterException("GenevaLogExporter Dispose failed.", ex);
}
}

this.isDisposed = true;
base.Dispose(disposing);
}
}
73 changes: 73 additions & 0 deletions src/OpenTelemetry.Exporter.Geneva/GenevaTraceExporter.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// <copyright file="GenevaTraceExporter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>

using System;
using System.Diagnostics;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.Geneva;

public class GenevaTraceExporter : GenevaBaseExporter<Activity>
{
internal readonly bool IsUsingUnixDomainSocket;

private bool isDisposed;

private delegate ExportResult ExportActivityFunc(in Batch<Activity> batch);

private readonly ExportActivityFunc exportActivity;

private readonly IDisposable exporter;

public GenevaTraceExporter(GenevaExporterOptions options)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNullOrWhitespace(options.ConnectionString);

var msgPackExporter = new MsgPackTraceExporter(options);
this.IsUsingUnixDomainSocket = msgPackExporter.IsUsingUnixDomainSocket;
this.exportActivity = (in Batch<Activity> batch) => msgPackExporter.Export(in batch);
this.exporter = msgPackExporter;
}

public override ExportResult Export(in Batch<Activity> batch)
{
return this.exportActivity(batch);
}

protected override void Dispose(bool disposing)
{
if (this.isDisposed)
{
return;
}

if (disposing)
{
try
{
this.exporter.Dispose();
}
catch (Exception ex)
{
ExporterEventSource.Log.ExporterException("GenevaTraceExporter Dispose failed.", ex);
}
}

this.isDisposed = true;
base.Dispose(disposing);
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// <copyright file="GenevaLogExporter.cs" company="OpenTelemetry Authors">
// <copyright file="MsgPackLogExporter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -21,12 +21,11 @@
using System.Runtime.InteropServices;
using System.Threading;
using Microsoft.Extensions.Logging;
using OpenTelemetry.Internal;
using OpenTelemetry.Logs;

namespace OpenTelemetry.Exporter.Geneva;

public class GenevaLogExporter : GenevaBaseExporter<LogRecord>
internal sealed class MsgPackLogExporter : IDisposable
{
private const int BUFFER_SIZE = 65360; // the maximum ETW payload (inclusive)
private const int MaxSanitizedEventNameLength = 50;
Expand All @@ -46,11 +45,8 @@ public class GenevaLogExporter : GenevaBaseExporter<LogRecord>
private readonly bool shouldPassThruTableMappings;
private bool isDisposed;

public GenevaLogExporter(GenevaExporterOptions options)
public MsgPackLogExporter(GenevaExporterOptions options)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNullOrWhitespace(options.ConnectionString);

// TODO: Validate mappings for reserved tablenames etc.
if (options.TableNameMappings != null)
{
Expand Down Expand Up @@ -134,7 +130,7 @@ public GenevaLogExporter(GenevaExporterOptions options)

private readonly IReadOnlyDictionary<string, string> m_tableMappings;

public override ExportResult Export(in Batch<LogRecord> batch)
public ExportResult Export(in Batch<LogRecord> batch)
{
var result = ExportResult.Success;
foreach (var logRecord in batch)
Expand All @@ -154,31 +150,6 @@ public override ExportResult Export(in Batch<LogRecord> batch)
return result;
}

protected override void Dispose(bool disposing)
{
if (this.isDisposed)
{
return;
}

if (disposing)
{
// DO NOT Dispose m_buffer as it is a static type
try
{
(this.m_dataTransport as IDisposable)?.Dispose();
this.m_prepopulatedFieldKeys.Clear();
}
catch (Exception ex)
{
ExporterEventSource.Log.ExporterException("GenevaLogExporter Dispose failed.", ex);
}
}

this.isDisposed = true;
base.Dispose(disposing);
}

internal bool IsUsingUnixDomainSocket
{
get => this.m_dataTransport is UnixDomainSocketDataTransport;
Expand Down Expand Up @@ -279,19 +250,19 @@ internal int SerializeLogRecord(LogRecord logRecord)
{
var key = this.m_prepopulatedFieldKeys[i];
var value = this.m_prepopulatedFields[key];
cursor = AddPartAField(buffer, cursor, key, value);
cursor = GenevaBaseExporter<LogRecord>.AddPartAField(buffer, cursor, key, value);
cntFields += 1;
}
}

// Part A - core envelope
if (sanitizedEventName.Length != 0)
{
cursor = AddPartAField(buffer, cursor, Schema.V40.PartA.Name, sanitizedEventName);
cursor = GenevaBaseExporter<LogRecord>.AddPartAField(buffer, cursor, Schema.V40.PartA.Name, sanitizedEventName);
}
else
{
cursor = AddPartAField(buffer, cursor, Schema.V40.PartA.Name, eventName);
cursor = GenevaBaseExporter<LogRecord>.AddPartAField(buffer, cursor, Schema.V40.PartA.Name, eventName);
}

cntFields += 1;
Expand Down Expand Up @@ -548,4 +519,25 @@ private static int SerializeSanitizedCategoryName(byte[] buffer, int cursor, str

return cursor;
}

public void Dispose()
{
if (this.isDisposed)
{
return;
}

// DO NOT Dispose m_buffer as it is a static type
try
{
(this.m_dataTransport as IDisposable)?.Dispose();
this.m_prepopulatedFieldKeys.Clear();
}
catch (Exception ex)
{
ExporterEventSource.Log.ExporterException("MsgPackLogExporter Dispose failed.", ex);
}

this.isDisposed = true;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// <copyright file="GenevaTraceExporter.cs" company="OpenTelemetry Authors">
// <copyright file="MsgPackTraceExporter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
Expand All @@ -20,17 +20,13 @@
using System.Linq;
using System.Runtime.InteropServices;
using System.Threading;
using OpenTelemetry.Internal;

namespace OpenTelemetry.Exporter.Geneva;

public class GenevaTraceExporter : GenevaBaseExporter<Activity>
internal sealed class MsgPackTraceExporter : IDisposable
{
public GenevaTraceExporter(GenevaExporterOptions options)
public MsgPackTraceExporter(GenevaExporterOptions options)
{
Guard.ThrowIfNull(options);
Guard.ThrowIfNullOrWhitespace(options.ConnectionString);

var partAName = "Span";
if (options.TableNameMappings != null
&& options.TableNameMappings.TryGetValue("Span", out var customTableName))
Expand Down Expand Up @@ -125,13 +121,13 @@ public GenevaTraceExporter(GenevaExporterOptions options)

// TODO: Do we support PartB as well?
// Part A - core envelope
cursor = AddPartAField(buffer, cursor, Schema.V40.PartA.Name, partAName);
cursor = GenevaBaseExporter<Activity>.AddPartAField(buffer, cursor, Schema.V40.PartA.Name, partAName);
this.m_cntPrepopulatedFields += 1;

foreach (var entry in options.PrepopulatedFields)
{
var value = entry.Value;
cursor = AddPartAField(buffer, cursor, entry.Key, value);
cursor = GenevaBaseExporter<Activity>.AddPartAField(buffer, cursor, entry.Key, value);
this.m_cntPrepopulatedFields += 1;
}

Expand All @@ -144,7 +140,7 @@ public GenevaTraceExporter(GenevaExporterOptions options)
Buffer.BlockCopy(buffer, 0, this.m_bufferEpilogue, 0, cursor - 0);
}

public override ExportResult Export(in Batch<Activity> batch)
public ExportResult Export(in Batch<Activity> batch)
{
// Note: The MessagePackSerializer takes way less time / memory than creating the activity itself.
// This makes the short-circuit check less useful.
Expand Down Expand Up @@ -173,30 +169,6 @@ public override ExportResult Export(in Batch<Activity> batch)
return result;
}

protected override void Dispose(bool disposing)
{
if (this.isDisposed)
{
return;
}

if (disposing)
{
try
{
(this.m_dataTransport as IDisposable)?.Dispose();
this.m_buffer.Dispose();
}
catch (Exception ex)
{
ExporterEventSource.Log.ExporterException("GenevaTraceExporter Dispose failed.", ex);
}
}

this.isDisposed = true;
base.Dispose(disposing);
}

internal bool IsUsingUnixDomainSocket
{
get => this.m_dataTransport is UnixDomainSocketDataTransport;
Expand Down Expand Up @@ -403,6 +375,26 @@ internal int SerializeActivity(Activity activity)
return cursor;
}

public void Dispose()
{
if (this.isDisposed)
{
return;
}

try
{
(this.m_dataTransport as IDisposable)?.Dispose();
this.m_buffer.Dispose();
}
catch (Exception ex)
{
ExporterEventSource.Log.ExporterException("MsgPackTraceExporter Dispose failed.", ex);
}

this.isDisposed = true;
}

private const int BUFFER_SIZE = 65360; // the maximum ETW payload (inclusive)

private readonly ThreadLocal<byte[]> m_buffer = new ThreadLocal<byte[]>(() => null);
Expand Down
Loading

0 comments on commit 7f8df02

Please sign in to comment.