From 2112a4653737de33c9b9f81636863b9cf4f3ebdb Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 18 Apr 2023 16:01:40 +0200 Subject: [PATCH 1/2] Update to latest Elastic.Ingest so we can more reliably log to selflog Elastic.Ingest now models the callbacks more rigourously allowing implementations to inject their own into the channel without overriding anything the channel itself might rely on or the user might override in their channeloptions. This allows e.g the serilog implementation to inject its own listeners in isolation to log to SelfLog in isolation --- .../Program.cs | 60 ++++++++------ .../ElasticsearchBenchmarkExporter.cs | 2 + .../ElasticsearchBenchmarkExporterOptions.cs | 7 ++ .../ElasticsearchSink.cs | 64 ++++++++++++--- .../EcsDataStreamChannel.cs | 12 ++- ...c.Ingest.Elasticsearch.CommonSchema.csproj | 2 +- .../ElasticsearchLogger.cs | 5 ++ .../BdNetExporterTests.cs | 5 +- ...hema.Serilog.Sinks.IntegrationTests.csproj | 1 + .../SerilogCluster.cs | 10 +-- .../SerilogOutputTests.cs | 13 ++- .../SerilogSelfLogTests.cs | 79 +++++++++++++++++++ .../SerilogTestBase.cs | 8 +- .../DataStreamIngestionTests.cs | 3 +- .../IndexIngestionTests.cs | 3 +- .../TestBase.cs | 5 +- .../Elasticsearch.IntegrationDefaults.csproj | 2 +- .../IngestionCluster.cs | 4 +- 18 files changed, 219 insertions(+), 66 deletions(-) create mode 100644 tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogSelfLogTests.cs diff --git a/examples/Elastic.CommonSchema.Serilog.Sink.Example/Program.cs b/examples/Elastic.CommonSchema.Serilog.Sink.Example/Program.cs index b66cddef..94189b73 100644 --- a/examples/Elastic.CommonSchema.Serilog.Sink.Example/Program.cs +++ b/examples/Elastic.CommonSchema.Serilog.Sink.Example/Program.cs @@ -1,7 +1,9 @@ // See https://aka.ms/new-console-template for more information using Elastic.Channels; +using Elastic.Channels.Diagnostics; using Elastic.Clients.Elasticsearch; +using Elastic.CommonSchema; using Elastic.CommonSchema.Serilog; using Elastic.CommonSchema.Serilog.Sink; using Elastic.CommonSchema.Serilog.Sink.Example; @@ -12,6 +14,7 @@ using Microsoft.Extensions.Hosting; using Serilog; using Serilog.Events; +using BulkResponse = Elastic.Ingest.Elasticsearch.Serialization.BulkResponse; using DataStreamName = Elastic.Ingest.Elasticsearch.DataStreams.DataStreamName; using Host = Microsoft.Extensions.Hosting.Host; using Log = Serilog.Log; @@ -24,6 +27,8 @@ cluster.Start(TimeSpan.FromMinutes(1)); else Console.WriteLine("Using already running Elasticsearch instance"); +var waitHandle = new CountdownEvent(1); +IChannelDiagnosticsListener? listener = null; // -- Setup Serilog -- var nodes = new[] { new Uri("http://localhost:9200") }; @@ -33,40 +38,39 @@ .Enrich.FromLogContext() .WriteTo.Elasticsearch(nodes, opts => { - opts.BootstrapMethod = BootstrapMethod.Failure; + opts.BootstrapMethod = BootstrapMethod.None; opts.DataStream = new DataStreamName("logs", "console-example"); opts.ConfigureChannel = channelOpts => { - channelOpts.BufferOptions = new BufferOptions { ExportMaxConcurrency = 10 }; + channelOpts.BufferOptions = new BufferOptions + { + ExportMaxConcurrency = 1, + OutboundBufferMaxSize = 2, + WaitHandle = waitHandle + }; }; - }, transport => - { - //transport.Authentication(); - }) - // This is the bit that Elastic.CommonSchema.Serilog.Sink introduces - .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(client.Transport) - { - BootstrapMethod = BootstrapMethod.Failure, - DataStream = new DataStreamName("logs", "console-example"), - TextFormatting = new EcsTextFormatterConfiguration - { - MapCustom = (e, _) => e - }, - ConfigureChannel = channelOpts => { - channelOpts.BufferOptions = new BufferOptions { ExportMaxConcurrency = 10 }; - } + opts.ChannelDiagnosticsCallback = l => listener = l; + }) .CreateLogger(); +// -- Log 2 items and wait for flush -- +Log.Logger.Information("Writing event 1"); +Log.Logger.Information("Writing event 2"); + +if (!waitHandle.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) + throw new Exception($"No flush occurred in 10 seconds: {listener}", listener?.ObservedException); +else +{ + Console.WriteLine("Successfully indexed data to Elasticsearch"); + Console.WriteLine(listener); +} + + // -- Setup Console Host -- +/* var consoleHost = CreateHostBuilder(args, client).Build(); await consoleHost.RunAsync(); -static ElasticsearchClient CreateClient(EphemeralCluster cluster) -{ - var settings = new ElasticsearchClientSettings(cluster.NodesUris().First()) - .EnableDebugMode(); - return new ElasticsearchClient(settings); -} static IHostBuilder CreateHostBuilder(string[] args, ElasticsearchClient client) { @@ -83,3 +87,11 @@ static IHostBuilder CreateHostBuilder(string[] args, ElasticsearchClient client) }) .UseSerilog(); } +*/ +static ElasticsearchClient CreateClient(EphemeralCluster cluster) +{ + var settings = new ElasticsearchClientSettings(cluster.NodesUris().First()) + .EnableDebugMode(); + return new ElasticsearchClient(settings); +} + diff --git a/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporter.cs b/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporter.cs index 857fe5f0..568bc115 100644 --- a/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporter.cs +++ b/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporter.cs @@ -85,6 +85,8 @@ public override void ExportToLog(Summary summary, ILogger logger) }; Options.ChannelOptionsCallback?.Invoke(options); var channel = new EcsDataStreamChannel(options); + if (channel.DiagnosticsListener != null) + Options.ChannelDiagnosticsCallback?.Invoke(channel.DiagnosticsListener); if (!channel.BootstrapElasticsearch(Options.BootstrapMethod)) return; var benchmarks = CreateBenchmarkDocuments(summary); diff --git a/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporterOptions.cs b/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporterOptions.cs index d3103885..31b76aee 100644 --- a/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporterOptions.cs +++ b/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporterOptions.cs @@ -4,6 +4,7 @@ using System; using System.Linq; +using Elastic.Channels.Diagnostics; using Elastic.CommonSchema.BenchmarkDotNetExporter.Domain; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.DataStreams; @@ -95,6 +96,11 @@ public ElasticsearchBenchmarkExporterOptions(params Uri[] nodes) /// Allows the user to directly change used to export the benchmarks public Action> ChannelOptionsCallback { get; set; } + /// + /// Allows programmatic access to active channel diagnostics listener when its created. + /// + public Action ChannelDiagnosticsCallback { get; set; } + private static Uri[] Parse(string urls) { if (string.IsNullOrWhiteSpace(urls)) throw new ArgumentException("no urls provided, empty string or null", nameof(urls)); @@ -146,4 +152,5 @@ internal TransportConfiguration CreateTransportConfiguration() return settings; } } + } diff --git a/src/Elastic.CommonSchema.Serilog.Sink/ElasticsearchSink.cs b/src/Elastic.CommonSchema.Serilog.Sink/ElasticsearchSink.cs index 98bf7e8e..eef0f503 100644 --- a/src/Elastic.CommonSchema.Serilog.Sink/ElasticsearchSink.cs +++ b/src/Elastic.CommonSchema.Serilog.Sink/ElasticsearchSink.cs @@ -1,8 +1,12 @@ using System; +using System.Collections.Generic; using System.Linq; +using Elastic.Channels.Buffers; +using Elastic.Channels.Diagnostics; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.CommonSchema; using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Transport; using Elastic.Transport.Products.Elasticsearch; using Serilog.Core; @@ -46,6 +50,11 @@ public ElasticsearchSinkOptions() : this(new DefaultHttpTransport(TransportHelpe /// public Action>? ConfigureChannel { get; set; } + /// + /// Allows programmatic access to active channel diagnostics listener when its created. + /// + public Action? ChannelDiagnosticsCallback { get; set; } + /// public BootstrapMethod BootstrapMethod { get; set; } @@ -73,23 +82,15 @@ public ElasticsearchSink(ElasticsearchSinkOptions options) _formatterConfiguration = options.TextFormatting; var channelOptions = new DataStreamChannelOptions(options.Transport) { - DataStream = options.DataStream, - ExportResponseCallback = (response, _) => - { - var errorItems = response.Items.Where(i => i.Status >= 300).ToList(); - if (response.TryGetElasticsearchServerError(out var error)) - SelfLog.WriteLine("{0}", error); - foreach (var errorItem in errorItems) - SelfLog.WriteLine("{0}", $"Failed to {errorItem.Action} document status: ${errorItem.Status}, error: ${errorItem.Error}"); - - } + DataStream = options.DataStream }; options.ConfigureChannel?.Invoke(channelOptions); - _channel = new EcsDataStreamChannel(channelOptions); + _channel = new EcsDataStreamChannel(channelOptions, new [] { new SelfLogCallbackListener(options)}); + if (_channel.DiagnosticsListener != null) + options.ChannelDiagnosticsCallback?.Invoke(_channel.DiagnosticsListener); _channel.BootstrapElasticsearch(options.BootstrapMethod); } - /// public void Emit(LogEvent logEvent) { @@ -98,4 +99,43 @@ public void Emit(LogEvent logEvent) } } + + internal class SelfLogCallbackListener : IChannelCallbacks where TEcsDocument : EcsDocument, new() + { + public Action? ExportExceptionCallback { get; } + public Action? ExportResponseCallback { get; } + + // ReSharper disable UnassignedGetOnlyAutoProperty + public Action? ExportItemsAttemptCallback { get; } + public Action>? ExportMaxRetriesCallback { get; } + public Action>? ExportRetryCallback { get; } + public Action? PublishToInboundChannelCallback { get; } + public Action? PublishToInboundChannelFailureCallback { get; } + public Action? PublishToOutboundChannelCallback { get; } + public Action? OutboundChannelStartedCallback { get; } + public Action? OutboundChannelExitedCallback { get; } + public Action? InboundChannelStartedCallback { get; } + public Action? PublishToOutboundChannelFailureCallback { get; } + public Action? ExportBufferCallback { get; } + public Action? ExportRetryableCountCallback { get; } + // ReSharper enable UnassignedGetOnlyAutoProperty + + public SelfLogCallbackListener(ElasticsearchSinkOptions options) + { + ExportExceptionCallback = e => + { + SelfLog.WriteLine("Observed an exception while writing to {0}", options.DataStream); + SelfLog.WriteLine("{0}", e); + }; + ExportResponseCallback = (response, _) => + { + var errorItems = response.Items.Where(i => i.Status >= 300).ToList(); + if (response.TryGetElasticsearchServerError(out var error)) + SelfLog.WriteLine("{0}", error); + foreach (var errorItem in errorItems) + SelfLog.WriteLine("{0}", $"Failed to {errorItem.Action} document status: ${errorItem.Status}, error: ${errorItem.Error}"); + + }; + } + } } diff --git a/src/Elastic.Ingest.Elasticsearch.CommonSchema/EcsDataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch.CommonSchema/EcsDataStreamChannel.cs index 62c9905e..04f08750 100644 --- a/src/Elastic.Ingest.Elasticsearch.CommonSchema/EcsDataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch.CommonSchema/EcsDataStreamChannel.cs @@ -1,11 +1,14 @@ #nullable enable +using System.Collections.Generic; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Elastic.Channels.Diagnostics; using Elastic.CommonSchema; using Elastic.CommonSchema.Elasticsearch; using Elastic.CommonSchema.Serialization; using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Serialization; namespace Elastic.Ingest.Elasticsearch.CommonSchema { @@ -15,8 +18,15 @@ namespace Elastic.Ingest.Elasticsearch.CommonSchema public class EcsDataStreamChannel : DataStreamChannel where TEcsDocument : EcsDocument { + + /// + public EcsDataStreamChannel(DataStreamChannelOptions options) : this(options, null) { } + /// - public EcsDataStreamChannel(DataStreamChannelOptions options) : base(options) => + public EcsDataStreamChannel( + DataStreamChannelOptions options, + ICollection>? callbackListeners + ) : base(options, callbackListeners) => options.WriteEvent = async (stream, ctx, @event) => await JsonSerializer.SerializeAsync(stream, @event, typeof(TEcsDocument), EcsJsonConfiguration.SerializerOptions, ctx) .ConfigureAwait(false); diff --git a/src/Elastic.Ingest.Elasticsearch.CommonSchema/Elastic.Ingest.Elasticsearch.CommonSchema.csproj b/src/Elastic.Ingest.Elasticsearch.CommonSchema/Elastic.Ingest.Elasticsearch.CommonSchema.csproj index c7e962ed..6ffd7a52 100644 --- a/src/Elastic.Ingest.Elasticsearch.CommonSchema/Elastic.Ingest.Elasticsearch.CommonSchema.csproj +++ b/src/Elastic.Ingest.Elasticsearch.CommonSchema/Elastic.Ingest.Elasticsearch.CommonSchema.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/Elasticsearch.Extensions.Logging/ElasticsearchLogger.cs b/src/Elasticsearch.Extensions.Logging/ElasticsearchLogger.cs index d3a1402d..0382f3ce 100644 --- a/src/Elasticsearch.Extensions.Logging/ElasticsearchLogger.cs +++ b/src/Elasticsearch.Extensions.Logging/ElasticsearchLogger.cs @@ -9,6 +9,7 @@ using System.Text; using Elastic.CommonSchema; using Elastic.Channels; +using Elastic.Channels.Diagnostics; using Elasticsearch.Extensions.Logging.Options; using Microsoft.Extensions.Logging; @@ -24,6 +25,9 @@ public class ElasticsearchLogger : ILogger private readonly ElasticsearchLoggerOptions _options; private readonly IExternalScopeProvider? _scopeProvider; + /// + public IChannelDiagnosticsListener? DiagnosticsListener { get; } + internal ElasticsearchLogger( string categoryName, IBufferedChannel channel, @@ -35,6 +39,7 @@ internal ElasticsearchLogger( _channel = channel; _options = options; _scopeProvider = scopeProvider; + DiagnosticsListener = channel.DiagnosticsListener; } /// diff --git a/tests-integration/Elastic.CommonSchema.BenchmarkDotNetExporter.IntegrationTests/BdNetExporterTests.cs b/tests-integration/Elastic.CommonSchema.BenchmarkDotNetExporter.IntegrationTests/BdNetExporterTests.cs index 71cb5166..904371ee 100644 --- a/tests-integration/Elastic.CommonSchema.BenchmarkDotNetExporter.IntegrationTests/BdNetExporterTests.cs +++ b/tests-integration/Elastic.CommonSchema.BenchmarkDotNetExporter.IntegrationTests/BdNetExporterTests.cs @@ -52,20 +52,21 @@ private static IConfig CreateDefaultConfig() public void BenchmarkingPersistsResults() { var url = Client.ElasticsearchClientSettings.NodePool.Nodes.First().Uri; - var listener = new ChannelListener(); + IChannelDiagnosticsListener listener = null; var options = new ElasticsearchBenchmarkExporterOptions(url) { GitBranch = "externally-provided-branch", GitCommitMessage = "externally provided git commit message", GitRepositoryIdentifier = "repository", BootstrapMethod = BootstrapMethod.Silent, - ChannelOptionsCallback = (o) => listener.Register(o) + ChannelDiagnosticsCallback = (l) => listener = l }; var exporter = new ElasticsearchBenchmarkExporter(options); var config = CreateDefaultConfig().AddExporter(exporter); var summary = BenchmarkRunner.Run(typeof(Md5VsSha256), config); // ensure publication was success + listener.Should().NotBeNull(); listener.PublishSuccess.Should().BeTrue("{0}", listener); if (summary.HasCriticalValidationErrors) diff --git a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/Elastic.CommonSchema.Serilog.Sinks.IntegrationTests.csproj b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/Elastic.CommonSchema.Serilog.Sinks.IntegrationTests.csproj index 0e395bc3..a4659b51 100644 --- a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/Elastic.CommonSchema.Serilog.Sinks.IntegrationTests.csproj +++ b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/Elastic.CommonSchema.Serilog.Sinks.IntegrationTests.csproj @@ -4,6 +4,7 @@ net6.0 latest False + enable diff --git a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogCluster.cs b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogCluster.cs index d78ed307..ad0af548 100644 --- a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogCluster.cs +++ b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogCluster.cs @@ -3,11 +3,9 @@ [assembly: TestFramework("Elastic.Elasticsearch.Xunit.Sdk.ElasticTestFramework", "Elastic.Elasticsearch.Xunit")] -namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests -{ - public class SerilogCluster : TestClusterBase - { - public SerilogCluster() : base(9205) { } +namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests; - } +public class SerilogCluster : TestClusterBase +{ + public SerilogCluster() : base(9205) { } } diff --git a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogOutputTests.cs b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogOutputTests.cs index 940d323f..17a83794 100644 --- a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogOutputTests.cs +++ b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogOutputTests.cs @@ -18,13 +18,13 @@ namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests { - public class Serilog : SerilogTestBase + public class SerilogOutputTests : SerilogTestBase { + private IChannelDiagnosticsListener? _listener = null; private readonly CountdownEvent _waitHandle; - private readonly ChannelListener _listener; private ElasticsearchSinkOptions SinkOptions { get; } - public Serilog(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, output) + public SerilogOutputTests(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, output) { var logs = new List> { @@ -36,7 +36,6 @@ public Serilog(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, }; _waitHandle = new CountdownEvent(1); - _listener = new ChannelListener(); SinkOptions = new ElasticsearchSinkOptions(Client.Transport) { DataStream = new DataStreamName("logs", "serilog", "tests"), @@ -47,8 +46,8 @@ public Serilog(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, WaitHandle = _waitHandle, OutboundBufferMaxSize = logs.Count }; - _listener.Register(c); - } + }, + ChannelDiagnosticsCallback = (l) => _listener = l }; var loggerConfig = new LoggerConfiguration() @@ -64,7 +63,7 @@ public Serilog(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, [I] public async Task AssertLogs() { if (!_waitHandle.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) - throw new Exception($"No flush occurred in 10 seconds: {_listener}", _listener.ObservedException); + throw new Exception($"No flush occurred in 10 seconds: {_listener}", _listener?.ObservedException); var indexName = SinkOptions.DataStream.ToString(); var refreshed = await Client.Indices.RefreshAsync(new RefreshRequest(indexName)); diff --git a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogSelfLogTests.cs b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogSelfLogTests.cs new file mode 100644 index 00000000..6d9ec481 --- /dev/null +++ b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogSelfLogTests.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Channels; +using Elastic.Channels.Diagnostics; +using Elastic.Clients.Elasticsearch; +using Elastic.Clients.Elasticsearch.IndexManagement; +using Elastic.CommonSchema.Serilog.Sink; +using Elastic.Elasticsearch.Xunit.XunitPlumbing; +using FluentAssertions; +using Serilog; +using Serilog.Core; +using Xunit.Abstractions; +using DataStreamName = Elastic.Ingest.Elasticsearch.DataStreams.DataStreamName; +using BulkResponse = Elastic.Ingest.Elasticsearch.Serialization.BulkResponse; + +namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests +{ + public class SerilogSelfLogTests : SerilogTestBase + { + private readonly CountdownEvent _waitHandle; + private IChannelDiagnosticsListener? _listener; + private ElasticsearchSinkOptions SinkOptions { get; } + + private static ICollection AlterNodes(ICollection uris) => uris.Select(u => + { + var builder = new UriBuilder(u); + builder.Scheme = "https"; + return builder.Uri; + }) + .ToList(); + + public SerilogSelfLogTests(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, output, AlterNodes) + { + _waitHandle = new CountdownEvent(1); + SinkOptions = new ElasticsearchSinkOptions(Client.Transport) + { + DataStream = new DataStreamName("logs", "serilog", "tests"), + ConfigureChannel = c => + { + c.BufferOptions = new BufferOptions + { + ExportMaxRetries = 0, + WaitHandle = _waitHandle, + OutboundBufferMaxSize = 1 + }; + }, + ChannelDiagnosticsCallback = (l) => _listener = l + }; + } + + + [I] public void AssertLogs() + { + List messages = new(); + global::Serilog.Debugging.SelfLog.Enable(msg => + { + messages.Add(msg); + }); + + var loggerConfig = new LoggerConfiguration() + .MinimumLevel.Information() + .WriteTo.ColoredConsole() + .WriteTo.Elasticsearch(SinkOptions); + + using var logger = loggerConfig.CreateLogger(); + logger.Information("Hello world"); + + if (!_waitHandle.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) + throw new Exception($"No flush occurred in 10 seconds: {_listener}", _listener?.ObservedException); + + messages.Should().NotBeEmpty(); + global::Serilog.Debugging.SelfLog.Disable(); + } + + } +} diff --git a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogTestBase.cs b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogTestBase.cs index 73ecd57d..7b6afb5c 100644 --- a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogTestBase.cs +++ b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogTestBase.cs @@ -1,4 +1,6 @@ -using Elastic.Clients.Elasticsearch; +using System; +using System.Collections.Generic; +using Elastic.Clients.Elasticsearch; using Elastic.Elasticsearch.Xunit.XunitPlumbing; using Xunit.Abstractions; @@ -8,8 +10,8 @@ public abstract class SerilogTestBase : IClusterFixture { protected ElasticsearchClient Client { get; } - protected SerilogTestBase(SerilogCluster cluster, ITestOutputHelper output) => - Client = cluster.CreateClient(output); + protected SerilogTestBase(SerilogCluster cluster, ITestOutputHelper output, Func, ICollection>? alterNodes = null) => + Client = cluster.CreateClient(output, alterNodes); } } diff --git a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs index e011a442..2383cf57 100644 --- a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs +++ b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs @@ -31,7 +31,6 @@ public async Task EnsureDocumentsEndUpInDataStream() DataStream = targetDataStream, BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 }, }; - var listener = new ChannelListener().Register(options); var channel = new EcsDataStreamChannel(options); var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); @@ -43,7 +42,7 @@ public async Task EnsureDocumentsEndUpInDataStream() channel.TryWrite(new TimeSeriesDocument { Timestamp = DateTimeOffset.Now, Message = "hello-world" }); if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) - throw new Exception($"No flush occurred in 10 seconds: {listener}", listener.ObservedException); + throw new Exception($"No flush occurred in 10 seconds: {channel}", channel.DiagnosticsListener?.ObservedException); var refreshResult = await Client.Indices.RefreshAsync(targetDataStream.ToString()); refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); diff --git a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs index 6e1213a5..6e7af5d8 100644 --- a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs +++ b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs @@ -37,7 +37,6 @@ public async Task EnsureDocumentsEndUpInIndex() WaitHandle = slim, ExportMaxConcurrency = 1, }, }; - var listener = new ChannelListener().Register(options); var channel = new EcsIndexChannel(options); var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); bootstrapped.Should().BeTrue("Expected to be able to bootstrap index channel"); @@ -50,7 +49,7 @@ public async Task EnsureDocumentsEndUpInIndex() channel.TryWrite(new CatalogDocument { Created = date, Title = "Hello World!", Id = "hello-world" }); if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) - throw new Exception($"No flush occurred in 10 seconds: {listener}", listener.ObservedException); + throw new Exception($"No flush occurred in 10 seconds: {channel.DiagnosticsListener}", channel.DiagnosticsListener?.ObservedException); var refreshResult = await Client.Indices.RefreshAsync(indexName); refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); diff --git a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs index 879efc72..d0aec0c4 100644 --- a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs +++ b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs @@ -24,12 +24,10 @@ protected IDisposable CreateLogger( out ElasticsearchLoggerProvider provider, out string @namespace, out WaitHandle waitHandle, - out ChannelListener listener, + out IChannelDiagnosticsListener listener, Action setupLogger ) { - listener = new ChannelListener(); - var l = listener; @namespace = Guid.NewGuid().ToString("N").ToLowerInvariant().Substring(0, 6); var slim = new CountdownEvent(1); waitHandle = slim.WaitHandle; @@ -45,7 +43,6 @@ Action setupLogger c.BufferOptions.OutboundBufferMaxLifetime = TimeSpan.FromSeconds(1); c.BufferOptions.ExportMaxRetries = 0; c.BufferOptions.ExportMaxConcurrency = 1; - l.Register(c); }) }; diff --git a/tests-integration/Elasticsearch.IntegrationDefaults/Elasticsearch.IntegrationDefaults.csproj b/tests-integration/Elasticsearch.IntegrationDefaults/Elasticsearch.IntegrationDefaults.csproj index f1b59a37..a91b6c7a 100644 --- a/tests-integration/Elasticsearch.IntegrationDefaults/Elasticsearch.IntegrationDefaults.csproj +++ b/tests-integration/Elasticsearch.IntegrationDefaults/Elasticsearch.IntegrationDefaults.csproj @@ -11,7 +11,7 @@ - + diff --git a/tests-integration/Elasticsearch.IntegrationDefaults/IngestionCluster.cs b/tests-integration/Elasticsearch.IntegrationDefaults/IngestionCluster.cs index 478f1739..3c8389a0 100644 --- a/tests-integration/Elasticsearch.IntegrationDefaults/IngestionCluster.cs +++ b/tests-integration/Elasticsearch.IntegrationDefaults/IngestionCluster.cs @@ -20,17 +20,19 @@ protected TestClusterBase(int port = 9200) : base(new XunitClusterConfiguration( StartingPortNumber = port }) { } - public ElasticsearchClient CreateClient(ITestOutputHelper output) => + public ElasticsearchClient CreateClient(ITestOutputHelper output, Func, ICollection>? alterNodes = null) => this.GetOrAddClient(_ => { var hostName = (System.Diagnostics.Process.GetProcessesByName("mitmproxy").Any() ? "ipv4.fiddler" : "localhost"); var nodes = NodesUris(hostName); + if (alterNodes != null) nodes = alterNodes(nodes); var connectionPool = new StaticNodePool(nodes); var isCi = !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("CI")); var settings = new ElasticsearchClientSettings(connectionPool) .Proxy(new Uri("http://ipv4.fiddler:8080"), null!, null!) + .RequestTimeout(TimeSpan.FromSeconds(5)) .OnRequestCompleted(d => { try From facc86aeea1cf850cd5db2b82507713345fc040b Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 18 Apr 2023 17:18:54 +0200 Subject: [PATCH 2/2] fix compilation error --- .../ElasticsearchLoggerProvider.cs | 5 +++++ .../LoggingToDataStreamTests.cs | 2 +- .../LoggingToIndexTests.cs | 2 +- .../TestBase.cs | 1 + 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/Elasticsearch.Extensions.Logging/ElasticsearchLoggerProvider.cs b/src/Elasticsearch.Extensions.Logging/ElasticsearchLoggerProvider.cs index a0cdf36d..25a56fdb 100644 --- a/src/Elasticsearch.Extensions.Logging/ElasticsearchLoggerProvider.cs +++ b/src/Elasticsearch.Extensions.Logging/ElasticsearchLoggerProvider.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Threading; using Elastic.Channels; +using Elastic.Channels.Diagnostics; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.CommonSchema; using Elastic.Ingest.Elasticsearch.DataStreams; @@ -32,6 +33,9 @@ public class ElasticsearchLoggerProvider : ILoggerProvider, ISupportExternalScop private IExternalScopeProvider? _scopeProvider; private IBufferedChannel _shipper; + /// + public IChannelDiagnosticsListener? DiagnosticsListener { get; } + /// public ElasticsearchLoggerProvider(IOptionsMonitor options, IEnumerable channelConfigurations @@ -46,6 +50,7 @@ IEnumerable channelConfigurations _shipper = CreatIngestChannel(options.CurrentValue); _optionsReloadToken = _options.OnChange(o => ReloadShipper(o)); + DiagnosticsListener = _shipper.DiagnosticsListener; } // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global diff --git a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToDataStreamTests.cs b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToDataStreamTests.cs index 28c62763..259d3b9e 100644 --- a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToDataStreamTests.cs +++ b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToDataStreamTests.cs @@ -23,7 +23,7 @@ private IDisposable CreateLogger( out ElasticsearchLoggerProvider provider, out string @namespace, out WaitHandle waitHandle, - out ChannelListener listener + out IChannelDiagnosticsListener listener ) => base.CreateLogger(out logger, out provider, out @namespace, out waitHandle, out listener, (o, s) => { diff --git a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToIndexTests.cs b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToIndexTests.cs index 84f9afa4..54e1a163 100644 --- a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToIndexTests.cs +++ b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToIndexTests.cs @@ -23,7 +23,7 @@ private IDisposable CreateLogger( out ElasticsearchLoggerProvider provider, out string @namespace, out WaitHandle waitHandle, - out ChannelListener listener + out IChannelDiagnosticsListener listener ) => base.CreateLogger(out logger, out provider, out @namespace, out waitHandle, out listener, (o, s) => { diff --git a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs index d0aec0c4..422e87cd 100644 --- a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs +++ b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs @@ -57,6 +57,7 @@ Action setupLogger new LoggerFilterOptions { MinLevel = LogLevel.Information } ); logger = loggerFactory.CreateLogger(); + listener = provider.DiagnosticsListener; return loggerFactory; } }