Skip to content

Commit

Permalink
initial implementation with support for producer
Browse files Browse the repository at this point in the history
  • Loading branch information
vhatsura committed Aug 5, 2022
1 parent a7e4011 commit 7a0dd8d
Show file tree
Hide file tree
Showing 13 changed files with 385 additions and 1 deletion.
23 changes: 23 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
[*]
indent_style = space
indent_size = 2
end_of_line = lf
charset = utf-8
trim_trailing_whitespace = true
insert_final_newline = true

[*.md]
indent_size = 4
trim_trailing_whitespace = false

[*.sln]
indent_style = tab

[*.cs]
indent_size = 4
max_line_length = 120
csharp_max_line_length = 120

[*.{csproj,props,targets}]
indent_size = 4
trim_trailing_whitespace = false
12 changes: 12 additions & 0 deletions .github/dependabot.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: 2
updates:
- package-ecosystem: "github-actions"
directory: "/"
schedule:
interval: daily

- package-ecosystem: nuget
directory: "/"
schedule:
interval: daily
open-pull-requests-limit: 5
34 changes: 34 additions & 0 deletions .github/workflows/continuous.integration.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
name: Continuous Integration Check

on:
pull_request:
branches: [ main ]
types: [ opened, synchronize, reopened, ready_for_review ]

jobs:

ci:
runs-on: ubuntu-latest
env:
DOTNET_NOLOGO: true
DOTNET_CLI_TELEMETRY_OPTOUT: true

steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Setup .NET Core 6.0.x
uses: actions/setup-dotnet@v2
with:
dotnet-version: 6.0.x

- name: Install dependencies
run: dotnet restore

- name: Build
run: dotnet build --configuration Release --no-restore

- name: Test
run: dotnet test --configuration Release --no-build --verbosity normal
27 changes: 27 additions & 0 deletions .github/workflows/linter.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
---
name: Lint Code Base

on:
pull_request:
branches: [ main ]
types: [ opened, synchronize, reopened, ready_for_review ]

jobs:
lint:

name: Lint Code Base
runs-on: ubuntu-latest

steps:
- name: Checkout Code
uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Lint Code Base
uses: github/super-linter@v4
env:
VALIDATE_ALL_CODEBASE: false
VALIDATE_EDITORCONFIG: false
DEFAULT_BRANCH: main
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
48 changes: 48 additions & 0 deletions .github/workflows/nuget.publish.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
---
name: Publish NuGet package

on:
release:
types: [ published ]

jobs:

publish:
runs-on: ubuntu-latest
env:
DOTNET_NOLOGO: true
DOTNET_CLI_TELEMETRY_OPTOUT: true

steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0

- name: Setup .NET Core 6.0.x
uses: actions/setup-dotnet@v2
with:
dotnet-version: 6.0.x

- name: Install dependencies
run: dotnet restore

- name: Build
run: dotnet build --configuration Release --no-restore

- name: Pack
run: dotnet pack --configuration Release --no-build --include-symbols --output ./nupkgs

- name: Archive NuGet artifacts
uses: actions/upload-artifact@v3
with:
name: nuget-packages
path: ./nupkgs/*.nupkg

- name: Archive NuGet symbols artifacts
uses: actions/upload-artifact@v3
with:
name: nuget-symbol-packages
path: ./nupkgs/*.snupkg

- name: Push
run: dotnet nuget push './nupkgs/*.nupkg' -k ${{ secrets.NUGET_API_KEY }} -s https://api.nuget.org/v3/index.json
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -348,3 +348,8 @@ MigrationBackup/

# Ionide (cross platform F# VS Code tools) working folder
.ionide/

# Jetbrains Rider
.idea/

.DS_Store
8 changes: 8 additions & 0 deletions GitVersion.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
mode: ContinuousDeployment
branches:
main:
tag: alpha
ignore:
sha: [ ]
merge-message-formats: { }
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,17 @@
# confluent-kafka-extensions-diagnostics
# Confluent.Kafka.Extensions.Diagnostics

![GitHub Actions Badge](https://github.com/vhatsura/confluent-kafka-extensions-diagnostics/actions/workflows/continuous.integration.yml/badge.svg)
[![NuGet Badge](https://buildstats.info/nuget/Confluent.Kafka.Extensions.Diagnostics)](https://www.nuget.org/packages/Confluent.Kafka.Extensions.Diagnostics/)

## Installation

```powershell
Install-Package Confluent.Kafka.Extensions.Diagnostics
```

## Usage

```csharp
```

## Roadmap
16 changes: 16 additions & 0 deletions confluent-kafka-extensions-diagnostics.sln
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

Microsoft Visual Studio Solution File, Format Version 12.00
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Confluent.Kafka.Extensions.Diagnostics", "src\Confluent.Kafka.Extensions.Diagnostics\Confluent.Kafka.Extensions.Diagnostics.csproj", "{6B0AFF93-45AF-4A3A-BF4E-8D916AA1592B}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{6B0AFF93-45AF-4A3A-BF4E-8D916AA1592B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6B0AFF93-45AF-4A3A-BF4E-8D916AA1592B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6B0AFF93-45AF-4A3A-BF4E-8D916AA1592B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6B0AFF93-45AF-4A3A-BF4E-8D916AA1592B}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
EndGlobal
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
using System.Diagnostics;
using System.Text;

namespace Confluent.Kafka.Extensions.Diagnostics;

internal static class ActivityDiagnosticsHelper
{
private const string ActivitySourceName = "Confluent.Kafka.Extensions.Diagnostics";

private static ActivitySource ActivitySource { get; } = new(ActivitySourceName);

internal static Activity? Start<TKey, TValue>(TopicPartition partition, Message<TKey, TValue> message)
{
try
{
Activity? activity = ActivitySource.StartActivity("Confluent.Kafka.Produce", ActivityKind.Client,
default(ActivityContext),
new[]
{
new KeyValuePair<string, object>("messaging.system", "kafka"),
new KeyValuePair<string, object>("messaging.destination", partition.Topic),
new KeyValuePair<string, object>("messaging.destination_kind", "topic"),
new KeyValuePair<string, object>("messaging.kafka.partition", partition.Partition.ToString())
}!);

if (activity == null) return null;

if (activity.IsAllDataRequested)
{
if (message.Key != null)
{
activity.SetTag("messaging.kafka.message_key", message.Key.ToString());
}

if (message.Value != null)
{
int messagePayloadBytes = Encoding.UTF8.GetByteCount(message.Value.ToString()!);
activity.AddTag("messaging.message_payload_size_bytes", messagePayloadBytes.ToString());
}
}

if (message.Headers == null)
{
message.Headers = new Headers();
}

if (activity.Id != null)
message.Headers.Add("traceparent", Encoding.UTF8.GetBytes(activity.Id));

var tracestateStr = activity.Context.TraceState;
if (tracestateStr?.Length > 0)
{
message.Headers.Add("tracestate", Encoding.UTF8.GetBytes(tracestateStr));
}

return activity;
}
catch
{
// ignore
return null;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<TargetFramework>net6.0</TargetFramework>

<GenerateDocumentationFile>true</GenerateDocumentationFile>
<IncludeSymbols>true</IncludeSymbols>
<SymbolPackageFormat>snupkg</SymbolPackageFormat>
<EmbedUntrackedSources>true</EmbedUntrackedSources>
<Deterministic>true</Deterministic>
<ContinuousIntegrationBuild Condition="'$(GITHUB_ACTIONS)' == 'true'">true</ContinuousIntegrationBuild>
<EnableNETAnalyzers>true</EnableNETAnalyzers>
<AnalysisMode>All</AnalysisMode>
<AnalysisLevel>latest</AnalysisLevel>

<Authors>Vadim Hatsura</Authors>
<Description>Confluent.Kafka instrumentationusing the [System.Diagnostics.Activity](https://docs.microsoft.com/en-us/dotnet/api/system.diagnostics.activity) API</Description>
<PackageTags>kafka, confluent, diagnostics</PackageTags>
<Copyright>Copyright (c) 2022 - Present</Copyright>
<RepositoryType>git</RepositoryType>
<PublishRepositoryUrl>true</PublishRepositoryUrl>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="[1.6.2, 2.0.0)"/>
<PackageReference Include="GitVersion.MsBuild" Version="5.10.3">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="Microsoft.SourceLink.GitHub" Version="1.1.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

</Project>
82 changes: 82 additions & 0 deletions src/Confluent.Kafka.Extensions.Diagnostics/InstrumentedProducer.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
namespace Confluent.Kafka.Extensions.Diagnostics;

internal class InstrumentedProducer<TKey, TValue> : IProducer<TKey, TValue>
{
private readonly IProducer<TKey, TValue> _producerImplementation;

public InstrumentedProducer(IProducer<TKey, TValue> producer)
{
_producerImplementation = producer;
}

public void Dispose() => _producerImplementation.Dispose();

public int AddBrokers(string brokers) => _producerImplementation.AddBrokers(brokers);

public Handle Handle => _producerImplementation.Handle;

public string Name => _producerImplementation.Name;

public Task<DeliveryResult<TKey, TValue>> ProduceAsync(
string topic, Message<TKey, TValue> message, CancellationToken cancellationToken = new CancellationToken()) =>
ProduceAsync(new TopicPartition(topic, Partition.Any), message, cancellationToken);

public async Task<DeliveryResult<TKey, TValue>> ProduceAsync(
TopicPartition topicPartition, Message<TKey, TValue> message,
CancellationToken cancellationToken = new CancellationToken())
{
var activity = ActivityDiagnosticsHelper.Start(topicPartition, message);

try
{
return await _producerImplementation.ProduceAsync(topicPartition, message, cancellationToken);
}
finally
{
activity?.Stop();
}
}

public void Produce(
string topic, Message<TKey, TValue> message, Action<DeliveryReport<TKey, TValue>> deliveryHandler = null) =>
Produce(new TopicPartition(topic, Partition.Any), message, deliveryHandler);

public void Produce(
TopicPartition topicPartition, Message<TKey, TValue> message,
Action<DeliveryReport<TKey, TValue>> deliveryHandler = null)
{
var activity = ActivityDiagnosticsHelper.Start(topicPartition, message);

try
{
_producerImplementation.Produce(topicPartition, message, deliveryHandler);
}
finally
{
activity?.Stop();
}
}

public int Poll(TimeSpan timeout) => _producerImplementation.Poll(timeout);

public int Flush(TimeSpan timeout) => _producerImplementation.Flush(timeout);

public void Flush(CancellationToken cancellationToken = new CancellationToken()) =>
_producerImplementation.Flush(cancellationToken);

public void InitTransactions(TimeSpan timeout) => _producerImplementation.InitTransactions(timeout);

public void BeginTransaction() => _producerImplementation.BeginTransaction();

public void CommitTransaction(TimeSpan timeout) => _producerImplementation.CommitTransaction(timeout);

public void CommitTransaction() => _producerImplementation.CommitTransaction();

public void AbortTransaction(TimeSpan timeout) => _producerImplementation.AbortTransaction(timeout);

public void AbortTransaction() => _producerImplementation.AbortTransaction();

public void SendOffsetsToTransaction(
IEnumerable<TopicPartitionOffset> offsets, IConsumerGroupMetadata groupMetadata, TimeSpan timeout) =>
_producerImplementation.SendOffsetsToTransaction(offsets, groupMetadata, timeout);
}
Loading

0 comments on commit 7a0dd8d

Please sign in to comment.