Skip to content

Commit

Permalink
Update to latest Elastic.Ingest so we can more reliably log to selflog (
Browse files Browse the repository at this point in the history
  • Loading branch information
Mpdreamz authored Apr 19, 2023
1 parent 9aec0ab commit b8e730d
Show file tree
Hide file tree
Showing 21 changed files with 227 additions and 68 deletions.
60 changes: 36 additions & 24 deletions examples/Elastic.CommonSchema.Serilog.Sink.Example/Program.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// See https://aka.ms/new-console-template for more information

using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elastic.Clients.Elasticsearch;
using Elastic.CommonSchema;
using Elastic.CommonSchema.Serilog;
using Elastic.CommonSchema.Serilog.Sink;
using Elastic.CommonSchema.Serilog.Sink.Example;
Expand All @@ -12,6 +14,7 @@
using Microsoft.Extensions.Hosting;
using Serilog;
using Serilog.Events;
using BulkResponse = Elastic.Ingest.Elasticsearch.Serialization.BulkResponse;
using DataStreamName = Elastic.Ingest.Elasticsearch.DataStreams.DataStreamName;
using Host = Microsoft.Extensions.Hosting.Host;
using Log = Serilog.Log;
Expand All @@ -24,6 +27,8 @@
cluster.Start(TimeSpan.FromMinutes(1));
else Console.WriteLine("Using already running Elasticsearch instance");

var waitHandle = new CountdownEvent(1);
IChannelDiagnosticsListener? listener = null;

// -- Setup Serilog --
var nodes = new[] { new Uri("http://localhost:9200") };
Expand All @@ -33,40 +38,39 @@
.Enrich.FromLogContext()
.WriteTo.Elasticsearch(nodes, opts =>
{
opts.BootstrapMethod = BootstrapMethod.Failure;
opts.BootstrapMethod = BootstrapMethod.None;
opts.DataStream = new DataStreamName("logs", "console-example");
opts.ConfigureChannel = channelOpts => {
channelOpts.BufferOptions = new BufferOptions { ExportMaxConcurrency = 10 };
channelOpts.BufferOptions = new BufferOptions
{
ExportMaxConcurrency = 1,
OutboundBufferMaxSize = 2,
WaitHandle = waitHandle
};
};
}, transport =>
{
//transport.Authentication();
})
// This is the bit that Elastic.CommonSchema.Serilog.Sink introduces
.WriteTo.Elasticsearch(new ElasticsearchSinkOptions(client.Transport)
{
BootstrapMethod = BootstrapMethod.Failure,
DataStream = new DataStreamName("logs", "console-example"),
TextFormatting = new EcsTextFormatterConfiguration
{
MapCustom = (e, _) => e
},
ConfigureChannel = channelOpts => {
channelOpts.BufferOptions = new BufferOptions { ExportMaxConcurrency = 10 };
}
opts.ChannelDiagnosticsCallback = l => listener = l;

})
.CreateLogger();

// -- Log 2 items and wait for flush --
Log.Logger.Information("Writing event 1");
Log.Logger.Information("Writing event 2");

if (!waitHandle.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)))
throw new Exception($"No flush occurred in 10 seconds: {listener}", listener?.ObservedException);
else
{
Console.WriteLine("Successfully indexed data to Elasticsearch");
Console.WriteLine(listener);
}


// -- Setup Console Host --
/*
var consoleHost = CreateHostBuilder(args, client).Build();
await consoleHost.RunAsync();
static ElasticsearchClient CreateClient(EphemeralCluster cluster)
{
var settings = new ElasticsearchClientSettings(cluster.NodesUris().First())
.EnableDebugMode();
return new ElasticsearchClient(settings);
}
static IHostBuilder CreateHostBuilder(string[] args, ElasticsearchClient client)
{
Expand All @@ -83,3 +87,11 @@ static IHostBuilder CreateHostBuilder(string[] args, ElasticsearchClient client)
})
.UseSerilog();
}
*/
static ElasticsearchClient CreateClient(EphemeralCluster cluster)
{
var settings = new ElasticsearchClientSettings(cluster.NodesUris().First())
.EnableDebugMode();
return new ElasticsearchClient(settings);
}

Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public override void ExportToLog(Summary summary, ILogger logger)
};
Options.ChannelOptionsCallback?.Invoke(options);
var channel = new EcsDataStreamChannel<BenchmarkDocument>(options);
if (channel.DiagnosticsListener != null)
Options.ChannelDiagnosticsCallback?.Invoke(channel.DiagnosticsListener);
if (!channel.BootstrapElasticsearch(Options.BootstrapMethod)) return;

var benchmarks = CreateBenchmarkDocuments(summary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.Linq;
using Elastic.Channels.Diagnostics;
using Elastic.CommonSchema.BenchmarkDotNetExporter.Domain;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.DataStreams;
Expand Down Expand Up @@ -95,6 +96,11 @@ public ElasticsearchBenchmarkExporterOptions(params Uri[] nodes)
/// <summary> Allows the user to directly change <see cref="DataStreamChannelOptions{TEvent}"/> used to export the benchmarks </summary>
public Action<DataStreamChannelOptions<BenchmarkDocument>> ChannelOptionsCallback { get; set; }

/// <summary>
/// Allows programmatic access to active channel diagnostics listener when its created.
/// </summary>
public Action<IChannelDiagnosticsListener> ChannelDiagnosticsCallback { get; set; }

private static Uri[] Parse(string urls)
{
if (string.IsNullOrWhiteSpace(urls)) throw new ArgumentException("no urls provided, empty string or null", nameof(urls));
Expand Down Expand Up @@ -146,4 +152,5 @@ internal TransportConfiguration CreateTransportConfiguration()
return settings;
}
}

}
64 changes: 52 additions & 12 deletions src/Elastic.CommonSchema.Serilog.Sink/ElasticsearchSink.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Elastic.Channels.Buffers;
using Elastic.Channels.Diagnostics;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.CommonSchema;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
using Serilog.Core;
Expand Down Expand Up @@ -46,6 +50,11 @@ public ElasticsearchSinkOptions() : this(new DefaultHttpTransport(TransportHelpe
/// </summary>
public Action<DataStreamChannelOptions<TEcsDocument>>? ConfigureChannel { get; set; }

/// <summary>
/// Allows programmatic access to active channel diagnostics listener when its created.
/// </summary>
public Action<IChannelDiagnosticsListener>? ChannelDiagnosticsCallback { get; set; }

/// <inheritdoc cref="BootstrapMethod"/>
public BootstrapMethod BootstrapMethod { get; set; }

Expand Down Expand Up @@ -73,23 +82,15 @@ public ElasticsearchSink(ElasticsearchSinkOptions<TEcsDocument> options)
_formatterConfiguration = options.TextFormatting;
var channelOptions = new DataStreamChannelOptions<TEcsDocument>(options.Transport)
{
DataStream = options.DataStream,
ExportResponseCallback = (response, _) =>
{
var errorItems = response.Items.Where(i => i.Status >= 300).ToList();
if (response.TryGetElasticsearchServerError(out var error))
SelfLog.WriteLine("{0}", error);
foreach (var errorItem in errorItems)
SelfLog.WriteLine("{0}", $"Failed to {errorItem.Action} document status: ${errorItem.Status}, error: ${errorItem.Error}");

}
DataStream = options.DataStream
};
options.ConfigureChannel?.Invoke(channelOptions);
_channel = new EcsDataStreamChannel<TEcsDocument>(channelOptions);
_channel = new EcsDataStreamChannel<TEcsDocument>(channelOptions, new [] { new SelfLogCallbackListener<TEcsDocument>(options)});
if (_channel.DiagnosticsListener != null)
options.ChannelDiagnosticsCallback?.Invoke(_channel.DiagnosticsListener);
_channel.BootstrapElasticsearch(options.BootstrapMethod);
}


/// <inheritdoc cref="ILogEventSink.Emit"/>
public void Emit(LogEvent logEvent)
{
Expand All @@ -98,4 +99,43 @@ public void Emit(LogEvent logEvent)
}

}

internal class SelfLogCallbackListener<TEcsDocument> : IChannelCallbacks<TEcsDocument, BulkResponse> where TEcsDocument : EcsDocument, new()
{
public Action<Exception>? ExportExceptionCallback { get; }
public Action<BulkResponse, IWriteTrackingBuffer>? ExportResponseCallback { get; }

// ReSharper disable UnassignedGetOnlyAutoProperty
public Action<int, int>? ExportItemsAttemptCallback { get; }
public Action<IReadOnlyCollection<TEcsDocument>>? ExportMaxRetriesCallback { get; }
public Action<IReadOnlyCollection<TEcsDocument>>? ExportRetryCallback { get; }
public Action? PublishToInboundChannelCallback { get; }
public Action? PublishToInboundChannelFailureCallback { get; }
public Action? PublishToOutboundChannelCallback { get; }
public Action? OutboundChannelStartedCallback { get; }
public Action? OutboundChannelExitedCallback { get; }
public Action? InboundChannelStartedCallback { get; }
public Action? PublishToOutboundChannelFailureCallback { get; }
public Action? ExportBufferCallback { get; }
public Action<int>? ExportRetryableCountCallback { get; }
// ReSharper enable UnassignedGetOnlyAutoProperty

public SelfLogCallbackListener(ElasticsearchSinkOptions<TEcsDocument> options)
{
ExportExceptionCallback = e =>
{
SelfLog.WriteLine("Observed an exception while writing to {0}", options.DataStream);
SelfLog.WriteLine("{0}", e);
};
ExportResponseCallback = (response, _) =>
{
var errorItems = response.Items.Where(i => i.Status >= 300).ToList();
if (response.TryGetElasticsearchServerError(out var error))
SelfLog.WriteLine("{0}", error);
foreach (var errorItem in errorItems)
SelfLog.WriteLine("{0}", $"Failed to {errorItem.Action} document status: ${errorItem.Status}, error: ${errorItem.Error}");

};
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#nullable enable
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels.Diagnostics;
using Elastic.CommonSchema;
using Elastic.CommonSchema.Elasticsearch;
using Elastic.CommonSchema.Serialization;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Serialization;

namespace Elastic.Ingest.Elasticsearch.CommonSchema
{
Expand All @@ -15,8 +18,15 @@ namespace Elastic.Ingest.Elasticsearch.CommonSchema
public class EcsDataStreamChannel<TEcsDocument> : DataStreamChannel<TEcsDocument>
where TEcsDocument : EcsDocument
{

/// <inheritdoc cref="EcsDataStreamChannel{TEcsDocument}"/>
public EcsDataStreamChannel(DataStreamChannelOptions<TEcsDocument> options) : this(options, null) { }

/// <inheritdoc cref="EcsDataStreamChannel{TEcsDocument}"/>
public EcsDataStreamChannel(DataStreamChannelOptions<TEcsDocument> options) : base(options) =>
public EcsDataStreamChannel(
DataStreamChannelOptions<TEcsDocument> options,
ICollection<IChannelCallbacks<TEcsDocument, BulkResponse>>? callbackListeners
) : base(options, callbackListeners) =>
options.WriteEvent = async (stream, ctx, @event) =>
await JsonSerializer.SerializeAsync(stream, @event, typeof(TEcsDocument), EcsJsonConfiguration.SerializerOptions, ctx)
.ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Elastic.Ingest.Elasticsearch" Version="0.3.2" />
<PackageReference Include="Elastic.Ingest.Elasticsearch" Version="0.4.3" />
</ItemGroup>

</Project>
5 changes: 5 additions & 0 deletions src/Elasticsearch.Extensions.Logging/ElasticsearchLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Text;
using Elastic.CommonSchema;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elasticsearch.Extensions.Logging.Options;
using Microsoft.Extensions.Logging;

Expand All @@ -24,6 +25,9 @@ public class ElasticsearchLogger : ILogger
private readonly ElasticsearchLoggerOptions _options;
private readonly IExternalScopeProvider? _scopeProvider;

/// <inheritdoc cref="IChannelDiagnosticsListener"/>
public IChannelDiagnosticsListener? DiagnosticsListener { get; }

internal ElasticsearchLogger(
string categoryName,
IBufferedChannel<LogEvent> channel,
Expand All @@ -35,6 +39,7 @@ internal ElasticsearchLogger(
_channel = channel;
_options = options;
_scopeProvider = scopeProvider;
DiagnosticsListener = channel.DiagnosticsListener;
}

/// <inheritdoc cref="ILogger.BeginScope{TState}"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Threading;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.CommonSchema;
using Elastic.Ingest.Elasticsearch.DataStreams;
Expand All @@ -32,6 +33,9 @@ public class ElasticsearchLoggerProvider : ILoggerProvider, ISupportExternalScop
private IExternalScopeProvider? _scopeProvider;
private IBufferedChannel<LogEvent> _shipper;

/// <inheritdoc cref="IChannelDiagnosticsListener"/>
public IChannelDiagnosticsListener? DiagnosticsListener { get; }

/// <inheritdoc cref="ElasticsearchLoggerProvider"/>
public ElasticsearchLoggerProvider(IOptionsMonitor<ElasticsearchLoggerOptions> options,
IEnumerable<IChannelSetup> channelConfigurations
Expand All @@ -46,6 +50,7 @@ IEnumerable<IChannelSetup> channelConfigurations

_shipper = CreatIngestChannel(options.CurrentValue);
_optionsReloadToken = _options.OnChange(o => ReloadShipper(o));
DiagnosticsListener = _shipper.DiagnosticsListener;
}

// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,21 @@ private static IConfig CreateDefaultConfig()
public void BenchmarkingPersistsResults()
{
var url = Client.ElasticsearchClientSettings.NodePool.Nodes.First().Uri;
var listener = new ChannelListener<BenchmarkDocument, BulkResponse>();
IChannelDiagnosticsListener listener = null;
var options = new ElasticsearchBenchmarkExporterOptions(url)
{
GitBranch = "externally-provided-branch",
GitCommitMessage = "externally provided git commit message",
GitRepositoryIdentifier = "repository",
BootstrapMethod = BootstrapMethod.Silent,
ChannelOptionsCallback = (o) => listener.Register(o)
ChannelDiagnosticsCallback = (l) => listener = l
};
var exporter = new ElasticsearchBenchmarkExporter(options);
var config = CreateDefaultConfig().AddExporter(exporter);
var summary = BenchmarkRunner.Run(typeof(Md5VsSha256), config);

// ensure publication was success
listener.Should().NotBeNull();
listener.PublishSuccess.Should().BeTrue("{0}", listener);

if (summary.HasCriticalValidationErrors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<TargetFramework>net6.0</TargetFramework>
<LangVersion>latest</LangVersion>
<IsTestProject>False</IsTestProject>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@

[assembly: TestFramework("Elastic.Elasticsearch.Xunit.Sdk.ElasticTestFramework", "Elastic.Elasticsearch.Xunit")]

namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests
{
public class SerilogCluster : TestClusterBase
{
public SerilogCluster() : base(9205) { }
namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests;

}
public class SerilogCluster : TestClusterBase
{
public SerilogCluster() : base(9205) { }
}
Loading

0 comments on commit b8e730d

Please sign in to comment.