Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update to latest Elastic.Ingest so we can more reliably log to selflog #293

Merged
merged 2 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 36 additions & 24 deletions examples/Elastic.CommonSchema.Serilog.Sink.Example/Program.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
// See https://aka.ms/new-console-template for more information

using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elastic.Clients.Elasticsearch;
using Elastic.CommonSchema;
using Elastic.CommonSchema.Serilog;
using Elastic.CommonSchema.Serilog.Sink;
using Elastic.CommonSchema.Serilog.Sink.Example;
Expand All @@ -12,6 +14,7 @@
using Microsoft.Extensions.Hosting;
using Serilog;
using Serilog.Events;
using BulkResponse = Elastic.Ingest.Elasticsearch.Serialization.BulkResponse;
using DataStreamName = Elastic.Ingest.Elasticsearch.DataStreams.DataStreamName;
using Host = Microsoft.Extensions.Hosting.Host;
using Log = Serilog.Log;
Expand All @@ -24,6 +27,8 @@
cluster.Start(TimeSpan.FromMinutes(1));
else Console.WriteLine("Using already running Elasticsearch instance");

var waitHandle = new CountdownEvent(1);
IChannelDiagnosticsListener? listener = null;

// -- Setup Serilog --
var nodes = new[] { new Uri("http://localhost:9200") };
Expand All @@ -33,40 +38,39 @@
.Enrich.FromLogContext()
.WriteTo.Elasticsearch(nodes, opts =>
{
opts.BootstrapMethod = BootstrapMethod.Failure;
opts.BootstrapMethod = BootstrapMethod.None;
opts.DataStream = new DataStreamName("logs", "console-example");
opts.ConfigureChannel = channelOpts => {
channelOpts.BufferOptions = new BufferOptions { ExportMaxConcurrency = 10 };
channelOpts.BufferOptions = new BufferOptions
{
ExportMaxConcurrency = 1,
OutboundBufferMaxSize = 2,
WaitHandle = waitHandle
};
};
}, transport =>
{
//transport.Authentication();
})
// This is the bit that Elastic.CommonSchema.Serilog.Sink introduces
.WriteTo.Elasticsearch(new ElasticsearchSinkOptions(client.Transport)
{
BootstrapMethod = BootstrapMethod.Failure,
DataStream = new DataStreamName("logs", "console-example"),
TextFormatting = new EcsTextFormatterConfiguration
{
MapCustom = (e, _) => e
},
ConfigureChannel = channelOpts => {
channelOpts.BufferOptions = new BufferOptions { ExportMaxConcurrency = 10 };
}
opts.ChannelDiagnosticsCallback = l => listener = l;

})
.CreateLogger();

// -- Log 2 items and wait for flush --
Log.Logger.Information("Writing event 1");
Log.Logger.Information("Writing event 2");

if (!waitHandle.WaitHandle.WaitOne(TimeSpan.FromSeconds(10)))
throw new Exception($"No flush occurred in 10 seconds: {listener}", listener?.ObservedException);
else
{
Console.WriteLine("Successfully indexed data to Elasticsearch");
Console.WriteLine(listener);
}


// -- Setup Console Host --
/*
var consoleHost = CreateHostBuilder(args, client).Build();
await consoleHost.RunAsync();

static ElasticsearchClient CreateClient(EphemeralCluster cluster)
{
var settings = new ElasticsearchClientSettings(cluster.NodesUris().First())
.EnableDebugMode();
return new ElasticsearchClient(settings);
}

static IHostBuilder CreateHostBuilder(string[] args, ElasticsearchClient client)
{
Expand All @@ -83,3 +87,11 @@ static IHostBuilder CreateHostBuilder(string[] args, ElasticsearchClient client)
})
.UseSerilog();
}
*/
static ElasticsearchClient CreateClient(EphemeralCluster cluster)
{
var settings = new ElasticsearchClientSettings(cluster.NodesUris().First())
.EnableDebugMode();
return new ElasticsearchClient(settings);
}

Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public override void ExportToLog(Summary summary, ILogger logger)
};
Options.ChannelOptionsCallback?.Invoke(options);
var channel = new EcsDataStreamChannel<BenchmarkDocument>(options);
if (channel.DiagnosticsListener != null)
Options.ChannelDiagnosticsCallback?.Invoke(channel.DiagnosticsListener);
if (!channel.BootstrapElasticsearch(Options.BootstrapMethod)) return;

var benchmarks = CreateBenchmarkDocuments(summary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

using System;
using System.Linq;
using Elastic.Channels.Diagnostics;
using Elastic.CommonSchema.BenchmarkDotNetExporter.Domain;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.DataStreams;
Expand Down Expand Up @@ -95,6 +96,11 @@ public ElasticsearchBenchmarkExporterOptions(params Uri[] nodes)
/// <summary> Allows the user to directly change <see cref="DataStreamChannelOptions{TEvent}"/> used to export the benchmarks </summary>
public Action<DataStreamChannelOptions<BenchmarkDocument>> ChannelOptionsCallback { get; set; }

/// <summary>
/// Allows programmatic access to active channel diagnostics listener when its created.
/// </summary>
public Action<IChannelDiagnosticsListener> ChannelDiagnosticsCallback { get; set; }

private static Uri[] Parse(string urls)
{
if (string.IsNullOrWhiteSpace(urls)) throw new ArgumentException("no urls provided, empty string or null", nameof(urls));
Expand Down Expand Up @@ -146,4 +152,5 @@ internal TransportConfiguration CreateTransportConfiguration()
return settings;
}
}

}
64 changes: 52 additions & 12 deletions src/Elastic.CommonSchema.Serilog.Sink/ElasticsearchSink.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Elastic.Channels.Buffers;
using Elastic.Channels.Diagnostics;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.CommonSchema;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Serialization;
using Elastic.Transport;
using Elastic.Transport.Products.Elasticsearch;
using Serilog.Core;
Expand Down Expand Up @@ -46,6 +50,11 @@ public ElasticsearchSinkOptions() : this(new DefaultHttpTransport(TransportHelpe
/// </summary>
public Action<DataStreamChannelOptions<TEcsDocument>>? ConfigureChannel { get; set; }

/// <summary>
/// Allows programmatic access to active channel diagnostics listener when its created.
/// </summary>
public Action<IChannelDiagnosticsListener>? ChannelDiagnosticsCallback { get; set; }

/// <inheritdoc cref="BootstrapMethod"/>
public BootstrapMethod BootstrapMethod { get; set; }

Expand Down Expand Up @@ -73,23 +82,15 @@ public ElasticsearchSink(ElasticsearchSinkOptions<TEcsDocument> options)
_formatterConfiguration = options.TextFormatting;
var channelOptions = new DataStreamChannelOptions<TEcsDocument>(options.Transport)
{
DataStream = options.DataStream,
ExportResponseCallback = (response, _) =>
{
var errorItems = response.Items.Where(i => i.Status >= 300).ToList();
if (response.TryGetElasticsearchServerError(out var error))
SelfLog.WriteLine("{0}", error);
foreach (var errorItem in errorItems)
SelfLog.WriteLine("{0}", $"Failed to {errorItem.Action} document status: ${errorItem.Status}, error: ${errorItem.Error}");

}
DataStream = options.DataStream
};
options.ConfigureChannel?.Invoke(channelOptions);
_channel = new EcsDataStreamChannel<TEcsDocument>(channelOptions);
_channel = new EcsDataStreamChannel<TEcsDocument>(channelOptions, new [] { new SelfLogCallbackListener<TEcsDocument>(options)});
if (_channel.DiagnosticsListener != null)
options.ChannelDiagnosticsCallback?.Invoke(_channel.DiagnosticsListener);
_channel.BootstrapElasticsearch(options.BootstrapMethod);
}


/// <inheritdoc cref="ILogEventSink.Emit"/>
public void Emit(LogEvent logEvent)
{
Expand All @@ -98,4 +99,43 @@ public void Emit(LogEvent logEvent)
}

}

internal class SelfLogCallbackListener<TEcsDocument> : IChannelCallbacks<TEcsDocument, BulkResponse> where TEcsDocument : EcsDocument, new()
{
public Action<Exception>? ExportExceptionCallback { get; }
public Action<BulkResponse, IWriteTrackingBuffer>? ExportResponseCallback { get; }

// ReSharper disable UnassignedGetOnlyAutoProperty
public Action<int, int>? ExportItemsAttemptCallback { get; }
public Action<IReadOnlyCollection<TEcsDocument>>? ExportMaxRetriesCallback { get; }
public Action<IReadOnlyCollection<TEcsDocument>>? ExportRetryCallback { get; }
public Action? PublishToInboundChannelCallback { get; }
public Action? PublishToInboundChannelFailureCallback { get; }
public Action? PublishToOutboundChannelCallback { get; }
public Action? OutboundChannelStartedCallback { get; }
public Action? OutboundChannelExitedCallback { get; }
public Action? InboundChannelStartedCallback { get; }
public Action? PublishToOutboundChannelFailureCallback { get; }
public Action? ExportBufferCallback { get; }
public Action<int>? ExportRetryableCountCallback { get; }
// ReSharper enable UnassignedGetOnlyAutoProperty

public SelfLogCallbackListener(ElasticsearchSinkOptions<TEcsDocument> options)
{
ExportExceptionCallback = e =>
{
SelfLog.WriteLine("Observed an exception while writing to {0}", options.DataStream);
SelfLog.WriteLine("{0}", e);
};
ExportResponseCallback = (response, _) =>
{
var errorItems = response.Items.Where(i => i.Status >= 300).ToList();
if (response.TryGetElasticsearchServerError(out var error))
SelfLog.WriteLine("{0}", error);
foreach (var errorItem in errorItems)
SelfLog.WriteLine("{0}", $"Failed to {errorItem.Action} document status: ${errorItem.Status}, error: ${errorItem.Error}");

};
}
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#nullable enable
using System.Collections.Generic;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Elastic.Channels.Diagnostics;
using Elastic.CommonSchema;
using Elastic.CommonSchema.Elasticsearch;
using Elastic.CommonSchema.Serialization;
using Elastic.Ingest.Elasticsearch.DataStreams;
using Elastic.Ingest.Elasticsearch.Serialization;

namespace Elastic.Ingest.Elasticsearch.CommonSchema
{
Expand All @@ -15,8 +18,15 @@ namespace Elastic.Ingest.Elasticsearch.CommonSchema
public class EcsDataStreamChannel<TEcsDocument> : DataStreamChannel<TEcsDocument>
where TEcsDocument : EcsDocument
{

/// <inheritdoc cref="EcsDataStreamChannel{TEcsDocument}"/>
public EcsDataStreamChannel(DataStreamChannelOptions<TEcsDocument> options) : this(options, null) { }

/// <inheritdoc cref="EcsDataStreamChannel{TEcsDocument}"/>
public EcsDataStreamChannel(DataStreamChannelOptions<TEcsDocument> options) : base(options) =>
public EcsDataStreamChannel(
DataStreamChannelOptions<TEcsDocument> options,
ICollection<IChannelCallbacks<TEcsDocument, BulkResponse>>? callbackListeners
) : base(options, callbackListeners) =>
options.WriteEvent = async (stream, ctx, @event) =>
await JsonSerializer.SerializeAsync(stream, @event, typeof(TEcsDocument), EcsJsonConfiguration.SerializerOptions, ctx)
.ConfigureAwait(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Elastic.Ingest.Elasticsearch" Version="0.3.2" />
<PackageReference Include="Elastic.Ingest.Elasticsearch" Version="0.4.3" />
</ItemGroup>

</Project>
5 changes: 5 additions & 0 deletions src/Elasticsearch.Extensions.Logging/ElasticsearchLogger.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Text;
using Elastic.CommonSchema;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elasticsearch.Extensions.Logging.Options;
using Microsoft.Extensions.Logging;

Expand All @@ -24,6 +25,9 @@ public class ElasticsearchLogger : ILogger
private readonly ElasticsearchLoggerOptions _options;
private readonly IExternalScopeProvider? _scopeProvider;

/// <inheritdoc cref="IChannelDiagnosticsListener"/>
public IChannelDiagnosticsListener? DiagnosticsListener { get; }

internal ElasticsearchLogger(
string categoryName,
IBufferedChannel<LogEvent> channel,
Expand All @@ -35,6 +39,7 @@ internal ElasticsearchLogger(
_channel = channel;
_options = options;
_scopeProvider = scopeProvider;
DiagnosticsListener = channel.DiagnosticsListener;
}

/// <inheritdoc cref="ILogger.BeginScope{TState}"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Linq;
using System.Threading;
using Elastic.Channels;
using Elastic.Channels.Diagnostics;
using Elastic.Ingest.Elasticsearch;
using Elastic.Ingest.Elasticsearch.CommonSchema;
using Elastic.Ingest.Elasticsearch.DataStreams;
Expand All @@ -32,6 +33,9 @@ public class ElasticsearchLoggerProvider : ILoggerProvider, ISupportExternalScop
private IExternalScopeProvider? _scopeProvider;
private IBufferedChannel<LogEvent> _shipper;

/// <inheritdoc cref="IChannelDiagnosticsListener"/>
public IChannelDiagnosticsListener? DiagnosticsListener { get; }

/// <inheritdoc cref="ElasticsearchLoggerProvider"/>
public ElasticsearchLoggerProvider(IOptionsMonitor<ElasticsearchLoggerOptions> options,
IEnumerable<IChannelSetup> channelConfigurations
Expand All @@ -46,6 +50,7 @@ IEnumerable<IChannelSetup> channelConfigurations

_shipper = CreatIngestChannel(options.CurrentValue);
_optionsReloadToken = _options.OnChange(o => ReloadShipper(o));
DiagnosticsListener = _shipper.DiagnosticsListener;
}

// ReSharper disable once AutoPropertyCanBeMadeGetOnly.Global
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,20 +52,21 @@ private static IConfig CreateDefaultConfig()
public void BenchmarkingPersistsResults()
{
var url = Client.ElasticsearchClientSettings.NodePool.Nodes.First().Uri;
var listener = new ChannelListener<BenchmarkDocument, BulkResponse>();
IChannelDiagnosticsListener listener = null;
var options = new ElasticsearchBenchmarkExporterOptions(url)
{
GitBranch = "externally-provided-branch",
GitCommitMessage = "externally provided git commit message",
GitRepositoryIdentifier = "repository",
BootstrapMethod = BootstrapMethod.Silent,
ChannelOptionsCallback = (o) => listener.Register(o)
ChannelDiagnosticsCallback = (l) => listener = l
};
var exporter = new ElasticsearchBenchmarkExporter(options);
var config = CreateDefaultConfig().AddExporter(exporter);
var summary = BenchmarkRunner.Run(typeof(Md5VsSha256), config);

// ensure publication was success
listener.Should().NotBeNull();
listener.PublishSuccess.Should().BeTrue("{0}", listener);

if (summary.HasCriticalValidationErrors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<TargetFramework>net6.0</TargetFramework>
<LangVersion>latest</LangVersion>
<IsTestProject>False</IsTestProject>
<Nullable>enable</Nullable>
</PropertyGroup>

<ItemGroup>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,9 @@

[assembly: TestFramework("Elastic.Elasticsearch.Xunit.Sdk.ElasticTestFramework", "Elastic.Elasticsearch.Xunit")]

namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests
{
public class SerilogCluster : TestClusterBase
{
public SerilogCluster() : base(9205) { }
namespace Elastic.CommonSchema.Serilog.Sinks.IntegrationTests;

}
public class SerilogCluster : TestClusterBase
{
public SerilogCluster() : base(9205) { }
}
Loading