diff --git a/examples/Elastic.CommonSchema.Serilog.Sink.Example/Program.cs b/examples/Elastic.CommonSchema.Serilog.Sink.Example/Program.cs index b66cddef..94189b73 100644 --- a/examples/Elastic.CommonSchema.Serilog.Sink.Example/Program.cs +++ b/examples/Elastic.CommonSchema.Serilog.Sink.Example/Program.cs @@ -1,7 +1,9 @@ // See https://aka.ms/new-console-template for more information using Elastic.Channels; +using Elastic.Channels.Diagnostics; using Elastic.Clients.Elasticsearch; +using Elastic.CommonSchema; using Elastic.CommonSchema.Serilog; using Elastic.CommonSchema.Serilog.Sink; using Elastic.CommonSchema.Serilog.Sink.Example; @@ -12,6 +14,7 @@ using Microsoft.Extensions.Hosting; using Serilog; using Serilog.Events; +using BulkResponse = Elastic.Ingest.Elasticsearch.Serialization.BulkResponse; using DataStreamName = Elastic.Ingest.Elasticsearch.DataStreams.DataStreamName; using Host = Microsoft.Extensions.Hosting.Host; using Log = Serilog.Log; @@ -24,6 +27,8 @@ cluster.Start(TimeSpan.FromMinutes(1)); else Console.WriteLine("Using already running Elasticsearch instance"); +var waitHandle = new CountdownEvent(1); +IChannelDiagnosticsListener? listener = null; // -- Setup Serilog -- var nodes = new[] { new Uri("http://localhost:9200") }; @@ -33,40 +38,39 @@ .Enrich.FromLogContext() .WriteTo.Elasticsearch(nodes, opts => { - opts.BootstrapMethod = BootstrapMethod.Failure; + opts.BootstrapMethod = BootstrapMethod.None; opts.DataStream = new DataStreamName("logs", "console-example"); opts.ConfigureChannel = channelOpts => { - channelOpts.BufferOptions = new BufferOptions { ExportMaxConcurrency = 10 }; + channelOpts.BufferOptions = new BufferOptions + { + ExportMaxConcurrency = 1, + OutboundBufferMaxSize = 2, + WaitHandle = waitHandle + }; }; - }, transport => - { - //transport.Authentication(); - }) - // This is the bit that Elastic.CommonSchema.Serilog.Sink introduces - .WriteTo.Elasticsearch(new ElasticsearchSinkOptions(client.Transport) - { - BootstrapMethod = BootstrapMethod.Failure, - DataStream = new DataStreamName("logs", "console-example"), - TextFormatting = new EcsTextFormatterConfiguration - { - MapCustom = (e, _) => e - }, - ConfigureChannel = channelOpts => { - channelOpts.BufferOptions = new BufferOptions { ExportMaxConcurrency = 10 }; - } + opts.ChannelDiagnosticsCallback = l => listener = l; + }) .CreateLogger(); +// -- Log 2 items and wait for flush -- +Log.Logger.Information("Writing event 1"); +Log.Logger.Information("Writing event 2"); + +if (!waitHandle.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) + throw new Exception($"No flush occurred in 10 seconds: {listener}", listener?.ObservedException); +else +{ + Console.WriteLine("Successfully indexed data to Elasticsearch"); + Console.WriteLine(listener); +} + + // -- Setup Console Host -- +/* var consoleHost = CreateHostBuilder(args, client).Build(); await consoleHost.RunAsync(); -static ElasticsearchClient CreateClient(EphemeralCluster cluster) -{ - var settings = new ElasticsearchClientSettings(cluster.NodesUris().First()) - .EnableDebugMode(); - return new ElasticsearchClient(settings); -} static IHostBuilder CreateHostBuilder(string[] args, ElasticsearchClient client) { @@ -83,3 +87,11 @@ static IHostBuilder CreateHostBuilder(string[] args, ElasticsearchClient client) }) .UseSerilog(); } +*/ +static ElasticsearchClient CreateClient(EphemeralCluster cluster) +{ + var settings = new ElasticsearchClientSettings(cluster.NodesUris().First()) + .EnableDebugMode(); + return new ElasticsearchClient(settings); +} + diff --git a/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporter.cs b/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporter.cs index 857fe5f0..568bc115 100644 --- a/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporter.cs +++ b/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporter.cs @@ -85,6 +85,8 @@ public override void ExportToLog(Summary summary, ILogger logger) }; Options.ChannelOptionsCallback?.Invoke(options); var channel = new EcsDataStreamChannel(options); + if (channel.DiagnosticsListener != null) + Options.ChannelDiagnosticsCallback?.Invoke(channel.DiagnosticsListener); if (!channel.BootstrapElasticsearch(Options.BootstrapMethod)) return; var benchmarks = CreateBenchmarkDocuments(summary); diff --git a/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporterOptions.cs b/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporterOptions.cs index d3103885..31b76aee 100644 --- a/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporterOptions.cs +++ b/src/Elastic.CommonSchema.BenchmarkDotNetExporter/ElasticsearchBenchmarkExporterOptions.cs @@ -4,6 +4,7 @@ using System; using System.Linq; +using Elastic.Channels.Diagnostics; using Elastic.CommonSchema.BenchmarkDotNetExporter.Domain; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.DataStreams; @@ -95,6 +96,11 @@ public ElasticsearchBenchmarkExporterOptions(params Uri[] nodes) /// Allows the user to directly change used to export the benchmarks public Action> ChannelOptionsCallback { get; set; } + /// + /// Allows programmatic access to active channel diagnostics listener when its created. + /// + public Action ChannelDiagnosticsCallback { get; set; } + private static Uri[] Parse(string urls) { if (string.IsNullOrWhiteSpace(urls)) throw new ArgumentException("no urls provided, empty string or null", nameof(urls)); @@ -146,4 +152,5 @@ internal TransportConfiguration CreateTransportConfiguration() return settings; } } + } diff --git a/src/Elastic.CommonSchema.Serilog.Sink/ElasticsearchSink.cs b/src/Elastic.CommonSchema.Serilog.Sink/ElasticsearchSink.cs index 98bf7e8e..eef0f503 100644 --- a/src/Elastic.CommonSchema.Serilog.Sink/ElasticsearchSink.cs +++ b/src/Elastic.CommonSchema.Serilog.Sink/ElasticsearchSink.cs @@ -1,8 +1,12 @@ using System; +using System.Collections.Generic; using System.Linq; +using Elastic.Channels.Buffers; +using Elastic.Channels.Diagnostics; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.CommonSchema; using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Serialization; using Elastic.Transport; using Elastic.Transport.Products.Elasticsearch; using Serilog.Core; @@ -46,6 +50,11 @@ public ElasticsearchSinkOptions() : this(new DefaultHttpTransport(TransportHelpe /// public Action>? ConfigureChannel { get; set; } + /// + /// Allows programmatic access to active channel diagnostics listener when its created. + /// + public Action? ChannelDiagnosticsCallback { get; set; } + /// public BootstrapMethod BootstrapMethod { get; set; } @@ -73,23 +82,15 @@ public ElasticsearchSink(ElasticsearchSinkOptions options) _formatterConfiguration = options.TextFormatting; var channelOptions = new DataStreamChannelOptions(options.Transport) { - DataStream = options.DataStream, - ExportResponseCallback = (response, _) => - { - var errorItems = response.Items.Where(i => i.Status >= 300).ToList(); - if (response.TryGetElasticsearchServerError(out var error)) - SelfLog.WriteLine("{0}", error); - foreach (var errorItem in errorItems) - SelfLog.WriteLine("{0}", $"Failed to {errorItem.Action} document status: ${errorItem.Status}, error: ${errorItem.Error}"); - - } + DataStream = options.DataStream }; options.ConfigureChannel?.Invoke(channelOptions); - _channel = new EcsDataStreamChannel(channelOptions); + _channel = new EcsDataStreamChannel(channelOptions, new [] { new SelfLogCallbackListener(options)}); + if (_channel.DiagnosticsListener != null) + options.ChannelDiagnosticsCallback?.Invoke(_channel.DiagnosticsListener); _channel.BootstrapElasticsearch(options.BootstrapMethod); } - /// public void Emit(LogEvent logEvent) { @@ -98,4 +99,43 @@ public void Emit(LogEvent logEvent) } } + + internal class SelfLogCallbackListener : IChannelCallbacks where TEcsDocument : EcsDocument, new() + { + public Action? ExportExceptionCallback { get; } + public Action? ExportResponseCallback { get; } + + // ReSharper disable UnassignedGetOnlyAutoProperty + public Action? ExportItemsAttemptCallback { get; } + public Action>? ExportMaxRetriesCallback { get; } + public Action>? ExportRetryCallback { get; } + public Action? PublishToInboundChannelCallback { get; } + public Action? PublishToInboundChannelFailureCallback { get; } + public Action? PublishToOutboundChannelCallback { get; } + public Action? OutboundChannelStartedCallback { get; } + public Action? OutboundChannelExitedCallback { get; } + public Action? InboundChannelStartedCallback { get; } + public Action? PublishToOutboundChannelFailureCallback { get; } + public Action? ExportBufferCallback { get; } + public Action? ExportRetryableCountCallback { get; } + // ReSharper enable UnassignedGetOnlyAutoProperty + + public SelfLogCallbackListener(ElasticsearchSinkOptions options) + { + ExportExceptionCallback = e => + { + SelfLog.WriteLine("Observed an exception while writing to {0}", options.DataStream); + SelfLog.WriteLine("{0}", e); + }; + ExportResponseCallback = (response, _) => + { + var errorItems = response.Items.Where(i => i.Status >= 300).ToList(); + if (response.TryGetElasticsearchServerError(out var error)) + SelfLog.WriteLine("{0}", error); + foreach (var errorItem in errorItems) + SelfLog.WriteLine("{0}", $"Failed to {errorItem.Action} document status: ${errorItem.Status}, error: ${errorItem.Error}"); + + }; + } + } } diff --git a/src/Elastic.Ingest.Elasticsearch.CommonSchema/EcsDataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch.CommonSchema/EcsDataStreamChannel.cs index 62c9905e..04f08750 100644 --- a/src/Elastic.Ingest.Elasticsearch.CommonSchema/EcsDataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch.CommonSchema/EcsDataStreamChannel.cs @@ -1,11 +1,14 @@ #nullable enable +using System.Collections.Generic; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Elastic.Channels.Diagnostics; using Elastic.CommonSchema; using Elastic.CommonSchema.Elasticsearch; using Elastic.CommonSchema.Serialization; using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Serialization; namespace Elastic.Ingest.Elasticsearch.CommonSchema { @@ -15,8 +18,15 @@ namespace Elastic.Ingest.Elasticsearch.CommonSchema public class EcsDataStreamChannel : DataStreamChannel where TEcsDocument : EcsDocument { + + /// + public EcsDataStreamChannel(DataStreamChannelOptions options) : this(options, null) { } + /// - public EcsDataStreamChannel(DataStreamChannelOptions options) : base(options) => + public EcsDataStreamChannel( + DataStreamChannelOptions options, + ICollection>? callbackListeners + ) : base(options, callbackListeners) => options.WriteEvent = async (stream, ctx, @event) => await JsonSerializer.SerializeAsync(stream, @event, typeof(TEcsDocument), EcsJsonConfiguration.SerializerOptions, ctx) .ConfigureAwait(false); diff --git a/src/Elastic.Ingest.Elasticsearch.CommonSchema/Elastic.Ingest.Elasticsearch.CommonSchema.csproj b/src/Elastic.Ingest.Elasticsearch.CommonSchema/Elastic.Ingest.Elasticsearch.CommonSchema.csproj index c7e962ed..6ffd7a52 100644 --- a/src/Elastic.Ingest.Elasticsearch.CommonSchema/Elastic.Ingest.Elasticsearch.CommonSchema.csproj +++ b/src/Elastic.Ingest.Elasticsearch.CommonSchema/Elastic.Ingest.Elasticsearch.CommonSchema.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/Elasticsearch.Extensions.Logging/ElasticsearchLogger.cs b/src/Elasticsearch.Extensions.Logging/ElasticsearchLogger.cs index d3a1402d..0382f3ce 100644 --- a/src/Elasticsearch.Extensions.Logging/ElasticsearchLogger.cs +++ b/src/Elasticsearch.Extensions.Logging/ElasticsearchLogger.cs @@ -9,6 +9,7 @@ using System.Text; using Elastic.CommonSchema; using Elastic.Channels; +using Elastic.Channels.Diagnostics; using Elasticsearch.Extensions.Logging.Options; using Microsoft.Extensions.Logging; @@ -24,6 +25,9 @@ public class ElasticsearchLogger : ILogger private readonly ElasticsearchLoggerOptions _options; private readonly IExternalScopeProvider? _scopeProvider; + /// + public IChannelDiagnosticsListener? DiagnosticsListener { get; } + internal ElasticsearchLogger( string categoryName, IBufferedChannel channel, @@ -35,6 +39,7 @@ internal ElasticsearchLogger( _channel = channel; _options = options; _scopeProvider = scopeProvider; + DiagnosticsListener = channel.DiagnosticsListener; } /// diff --git a/src/Elasticsearch.Extensions.Logging/ElasticsearchLoggerProvider.cs b/src/Elasticsearch.Extensions.Logging/ElasticsearchLoggerProvider.cs index a0cdf36d..25a56fdb 100644 --- a/src/Elasticsearch.Extensions.Logging/ElasticsearchLoggerProvider.cs +++ b/src/Elasticsearch.Extensions.Logging/ElasticsearchLoggerProvider.cs @@ -7,6 +7,7 @@ using System.Linq; using System.Threading; using Elastic.Channels; +using Elastic.Channels.Diagnostics; using Elastic.Ingest.Elasticsearch; using Elastic.Ingest.Elasticsearch.CommonSchema; using Elastic.Ingest.Elasticsearch.DataStreams; @@ -32,6 +33,9 @@ public class ElasticsearchLoggerProvider : ILoggerProvider, ISupportExternalScop private IExternalScopeProvider? _scopeProvider; private IBufferedChannel _shipper; + /// + public IChannelDiagnosticsListener? DiagnosticsListener { get; } + /// public ElasticsearchLoggerProvider(IOptionsMonitor options, IEnumerable channelConfigurations @@ -46,6 +50,7 @@ IEnumerable channelConfigurations _shipper = CreatIngestChannel(options.CurrentValue); _optionsReloadToken = _options.OnChange(o => ReloadShipper(o)); + DiagnosticsListener = _shipper.DiagnosticsListener; } // ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global diff --git a/tests-integration/Elastic.CommonSchema.BenchmarkDotNetExporter.IntegrationTests/BdNetExporterTests.cs b/tests-integration/Elastic.CommonSchema.BenchmarkDotNetExporter.IntegrationTests/BdNetExporterTests.cs index 71cb5166..904371ee 100644 --- a/tests-integration/Elastic.CommonSchema.BenchmarkDotNetExporter.IntegrationTests/BdNetExporterTests.cs +++ b/tests-integration/Elastic.CommonSchema.BenchmarkDotNetExporter.IntegrationTests/BdNetExporterTests.cs @@ -52,20 +52,21 @@ private static IConfig CreateDefaultConfig() public void BenchmarkingPersistsResults() { var url = Client.ElasticsearchClientSettings.NodePool.Nodes.First().Uri; - var listener = new ChannelListener(); + IChannelDiagnosticsListener listener = null; var options = new ElasticsearchBenchmarkExporterOptions(url) { GitBranch = "externally-provided-branch", GitCommitMessage = "externally provided git commit message", GitRepositoryIdentifier = "repository", BootstrapMethod = BootstrapMethod.Silent, - ChannelOptionsCallback = (o) => listener.Register(o) + ChannelDiagnosticsCallback = (l) => listener = l }; var exporter = new ElasticsearchBenchmarkExporter(options); var config = CreateDefaultConfig().AddExporter(exporter); var summary = BenchmarkRunner.Run(typeof(Md5VsSha256), config); // ensure publication was success + listener.Should().NotBeNull(); listener.PublishSuccess.Should().BeTrue("{0}", listener); if (summary.HasCriticalValidationErrors) diff --git a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/Elastic.CommonSchema.Serilog.Sinks.IntegrationTests.csproj b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/Elastic.CommonSchema.Serilog.Sinks.IntegrationTests.csproj index 0e395bc3..a4659b51 100644 --- a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/Elastic.CommonSchema.Serilog.Sinks.IntegrationTests.csproj +++ b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/Elastic.CommonSchema.Serilog.Sinks.IntegrationTests.csproj @@ -4,6 +4,7 @@ net6.0 latest False + enable diff --git a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogCluster.cs b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogCluster.cs index d78ed307..ad0af548 100644 --- a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogCluster.cs +++ b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogCluster.cs @@ -3,11 +3,9 @@ [assembly: TestFramework("Elastic.Elasticsearch.Xunit.Sdk.ElasticTestFramework", "Elastic.Elasticsearch.Xunit")] -namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests -{ - public class SerilogCluster : TestClusterBase - { - public SerilogCluster() : base(9205) { } +namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests; - } +public class SerilogCluster : TestClusterBase +{ + public SerilogCluster() : base(9205) { } } diff --git a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogOutputTests.cs b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogOutputTests.cs index 940d323f..17a83794 100644 --- a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogOutputTests.cs +++ b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogOutputTests.cs @@ -18,13 +18,13 @@ namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests { - public class Serilog : SerilogTestBase + public class SerilogOutputTests : SerilogTestBase { + private IChannelDiagnosticsListener? _listener = null; private readonly CountdownEvent _waitHandle; - private readonly ChannelListener _listener; private ElasticsearchSinkOptions SinkOptions { get; } - public Serilog(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, output) + public SerilogOutputTests(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, output) { var logs = new List> { @@ -36,7 +36,6 @@ public Serilog(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, }; _waitHandle = new CountdownEvent(1); - _listener = new ChannelListener(); SinkOptions = new ElasticsearchSinkOptions(Client.Transport) { DataStream = new DataStreamName("logs", "serilog", "tests"), @@ -47,8 +46,8 @@ public Serilog(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, WaitHandle = _waitHandle, OutboundBufferMaxSize = logs.Count }; - _listener.Register(c); - } + }, + ChannelDiagnosticsCallback = (l) => _listener = l }; var loggerConfig = new LoggerConfiguration() @@ -64,7 +63,7 @@ public Serilog(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, [I] public async Task AssertLogs() { if (!_waitHandle.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) - throw new Exception($"No flush occurred in 10 seconds: {_listener}", _listener.ObservedException); + throw new Exception($"No flush occurred in 10 seconds: {_listener}", _listener?.ObservedException); var indexName = SinkOptions.DataStream.ToString(); var refreshed = await Client.Indices.RefreshAsync(new RefreshRequest(indexName)); diff --git a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogSelfLogTests.cs b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogSelfLogTests.cs new file mode 100644 index 00000000..6d9ec481 --- /dev/null +++ b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogSelfLogTests.cs @@ -0,0 +1,79 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Channels; +using Elastic.Channels.Diagnostics; +using Elastic.Clients.Elasticsearch; +using Elastic.Clients.Elasticsearch.IndexManagement; +using Elastic.CommonSchema.Serilog.Sink; +using Elastic.Elasticsearch.Xunit.XunitPlumbing; +using FluentAssertions; +using Serilog; +using Serilog.Core; +using Xunit.Abstractions; +using DataStreamName = Elastic.Ingest.Elasticsearch.DataStreams.DataStreamName; +using BulkResponse = Elastic.Ingest.Elasticsearch.Serialization.BulkResponse; + +namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests +{ + public class SerilogSelfLogTests : SerilogTestBase + { + private readonly CountdownEvent _waitHandle; + private IChannelDiagnosticsListener? _listener; + private ElasticsearchSinkOptions SinkOptions { get; } + + private static ICollection AlterNodes(ICollection uris) => uris.Select(u => + { + var builder = new UriBuilder(u); + builder.Scheme = "https"; + return builder.Uri; + }) + .ToList(); + + public SerilogSelfLogTests(SerilogCluster cluster, ITestOutputHelper output) : base(cluster, output, AlterNodes) + { + _waitHandle = new CountdownEvent(1); + SinkOptions = new ElasticsearchSinkOptions(Client.Transport) + { + DataStream = new DataStreamName("logs", "serilog", "tests"), + ConfigureChannel = c => + { + c.BufferOptions = new BufferOptions + { + ExportMaxRetries = 0, + WaitHandle = _waitHandle, + OutboundBufferMaxSize = 1 + }; + }, + ChannelDiagnosticsCallback = (l) => _listener = l + }; + } + + + [I] public void AssertLogs() + { + List messages = new(); + global::Serilog.Debugging.SelfLog.Enable(msg => + { + messages.Add(msg); + }); + + var loggerConfig = new LoggerConfiguration() + .MinimumLevel.Information() + .WriteTo.ColoredConsole() + .WriteTo.Elasticsearch(SinkOptions); + + using var logger = loggerConfig.CreateLogger(); + logger.Information("Hello world"); + + if (!_waitHandle.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) + throw new Exception($"No flush occurred in 10 seconds: {_listener}", _listener?.ObservedException); + + messages.Should().NotBeEmpty(); + global::Serilog.Debugging.SelfLog.Disable(); + } + + } +} diff --git a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogTestBase.cs b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogTestBase.cs index 73ecd57d..7b6afb5c 100644 --- a/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogTestBase.cs +++ b/tests-integration/Elastic.CommonSchema.Serilog.Sink.IntegrationTests/SerilogTestBase.cs @@ -1,4 +1,6 @@ -using Elastic.Clients.Elasticsearch; +using System; +using System.Collections.Generic; +using Elastic.Clients.Elasticsearch; using Elastic.Elasticsearch.Xunit.XunitPlumbing; using Xunit.Abstractions; @@ -8,8 +10,8 @@ public abstract class SerilogTestBase : IClusterFixture { protected ElasticsearchClient Client { get; } - protected SerilogTestBase(SerilogCluster cluster, ITestOutputHelper output) => - Client = cluster.CreateClient(output); + protected SerilogTestBase(SerilogCluster cluster, ITestOutputHelper output, Func, ICollection>? alterNodes = null) => + Client = cluster.CreateClient(output, alterNodes); } } diff --git a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs index e011a442..2383cf57 100644 --- a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs +++ b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/DataStreamIngestionTests.cs @@ -31,7 +31,6 @@ public async Task EnsureDocumentsEndUpInDataStream() DataStream = targetDataStream, BufferOptions = new BufferOptions { WaitHandle = slim, OutboundBufferMaxSize = 1 }, }; - var listener = new ChannelListener().Register(options); var channel = new EcsDataStreamChannel(options); var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); @@ -43,7 +42,7 @@ public async Task EnsureDocumentsEndUpInDataStream() channel.TryWrite(new TimeSeriesDocument { Timestamp = DateTimeOffset.Now, Message = "hello-world" }); if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) - throw new Exception($"No flush occurred in 10 seconds: {listener}", listener.ObservedException); + throw new Exception($"No flush occurred in 10 seconds: {channel}", channel.DiagnosticsListener?.ObservedException); var refreshResult = await Client.Indices.RefreshAsync(targetDataStream.ToString()); refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); diff --git a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs index 6e1213a5..6e7af5d8 100644 --- a/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs +++ b/tests-integration/Elastic.Ingest.Elasticsearch.CommonSchema.IntegrationTests/IndexIngestionTests.cs @@ -37,7 +37,6 @@ public async Task EnsureDocumentsEndUpInIndex() WaitHandle = slim, ExportMaxConcurrency = 1, }, }; - var listener = new ChannelListener().Register(options); var channel = new EcsIndexChannel(options); var bootstrapped = await channel.BootstrapElasticsearchAsync(BootstrapMethod.Failure, "7-days-default"); bootstrapped.Should().BeTrue("Expected to be able to bootstrap index channel"); @@ -50,7 +49,7 @@ public async Task EnsureDocumentsEndUpInIndex() channel.TryWrite(new CatalogDocument { Created = date, Title = "Hello World!", Id = "hello-world" }); if (!slim.WaitHandle.WaitOne(TimeSpan.FromSeconds(10))) - throw new Exception($"No flush occurred in 10 seconds: {listener}", listener.ObservedException); + throw new Exception($"No flush occurred in 10 seconds: {channel.DiagnosticsListener}", channel.DiagnosticsListener?.ObservedException); var refreshResult = await Client.Indices.RefreshAsync(indexName); refreshResult.IsValidResponse.Should().BeTrue("{0}", refreshResult.DebugInformation); diff --git a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToDataStreamTests.cs b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToDataStreamTests.cs index 28c62763..259d3b9e 100644 --- a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToDataStreamTests.cs +++ b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToDataStreamTests.cs @@ -23,7 +23,7 @@ private IDisposable CreateLogger( out ElasticsearchLoggerProvider provider, out string @namespace, out WaitHandle waitHandle, - out ChannelListener listener + out IChannelDiagnosticsListener listener ) => base.CreateLogger(out logger, out provider, out @namespace, out waitHandle, out listener, (o, s) => { diff --git a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToIndexTests.cs b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToIndexTests.cs index 84f9afa4..54e1a163 100644 --- a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToIndexTests.cs +++ b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/LoggingToIndexTests.cs @@ -23,7 +23,7 @@ private IDisposable CreateLogger( out ElasticsearchLoggerProvider provider, out string @namespace, out WaitHandle waitHandle, - out ChannelListener listener + out IChannelDiagnosticsListener listener ) => base.CreateLogger(out logger, out provider, out @namespace, out waitHandle, out listener, (o, s) => { diff --git a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs index 879efc72..422e87cd 100644 --- a/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs +++ b/tests-integration/Elasticsearch.Extensions.Logging.IntegrationTests/TestBase.cs @@ -24,12 +24,10 @@ protected IDisposable CreateLogger( out ElasticsearchLoggerProvider provider, out string @namespace, out WaitHandle waitHandle, - out ChannelListener listener, + out IChannelDiagnosticsListener listener, Action setupLogger ) { - listener = new ChannelListener(); - var l = listener; @namespace = Guid.NewGuid().ToString("N").ToLowerInvariant().Substring(0, 6); var slim = new CountdownEvent(1); waitHandle = slim.WaitHandle; @@ -45,7 +43,6 @@ Action setupLogger c.BufferOptions.OutboundBufferMaxLifetime = TimeSpan.FromSeconds(1); c.BufferOptions.ExportMaxRetries = 0; c.BufferOptions.ExportMaxConcurrency = 1; - l.Register(c); }) }; @@ -60,6 +57,7 @@ Action setupLogger new LoggerFilterOptions { MinLevel = LogLevel.Information } ); logger = loggerFactory.CreateLogger(); + listener = provider.DiagnosticsListener; return loggerFactory; } } diff --git a/tests-integration/Elasticsearch.IntegrationDefaults/Elasticsearch.IntegrationDefaults.csproj b/tests-integration/Elasticsearch.IntegrationDefaults/Elasticsearch.IntegrationDefaults.csproj index f1b59a37..a91b6c7a 100644 --- a/tests-integration/Elasticsearch.IntegrationDefaults/Elasticsearch.IntegrationDefaults.csproj +++ b/tests-integration/Elasticsearch.IntegrationDefaults/Elasticsearch.IntegrationDefaults.csproj @@ -11,7 +11,7 @@ - + diff --git a/tests-integration/Elasticsearch.IntegrationDefaults/IngestionCluster.cs b/tests-integration/Elasticsearch.IntegrationDefaults/IngestionCluster.cs index 478f1739..3c8389a0 100644 --- a/tests-integration/Elasticsearch.IntegrationDefaults/IngestionCluster.cs +++ b/tests-integration/Elasticsearch.IntegrationDefaults/IngestionCluster.cs @@ -20,17 +20,19 @@ protected TestClusterBase(int port = 9200) : base(new XunitClusterConfiguration( StartingPortNumber = port }) { } - public ElasticsearchClient CreateClient(ITestOutputHelper output) => + public ElasticsearchClient CreateClient(ITestOutputHelper output, Func, ICollection>? alterNodes = null) => this.GetOrAddClient(_ => { var hostName = (System.Diagnostics.Process.GetProcessesByName("mitmproxy").Any() ? "ipv4.fiddler" : "localhost"); var nodes = NodesUris(hostName); + if (alterNodes != null) nodes = alterNodes(nodes); var connectionPool = new StaticNodePool(nodes); var isCi = !string.IsNullOrEmpty(Environment.GetEnvironmentVariable("CI")); var settings = new ElasticsearchClientSettings(connectionPool) .Proxy(new Uri("http://ipv4.fiddler:8080"), null!, null!) + .RequestTimeout(TimeSpan.FromSeconds(5)) .OnRequestCompleted(d => { try