Skip to content
This repository has been archived by the owner on Jun 1, 2024. It is now read-only.

feature: add option to skip sending oversized data to Elasticsearch #169

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,142 +104,6 @@ public static LoggerConfiguration Elasticsearch(
AutoRegisterTemplateVersion.ESv2, false, RegisterTemplateRecovery.IndexAnyway, null, null, null);
}

/// <summary>
/// Overload to allow basic configuration through AppSettings.
/// </summary>
/// <param name="loggerSinkConfiguration">Options for the sink.</param>
/// <param name="nodeUris">A comma or semi column separated list of URIs for Elasticsearch nodes.</param>
/// <param name="indexFormat"><see cref="ElasticsearchSinkOptions.IndexFormat"/></param>
/// <param name="templateName"><see cref="ElasticsearchSinkOptions.TemplateName"/></param>
/// <param name="typeName"><see cref="ElasticsearchSinkOptions.TypeName"/></param>
/// <param name="batchPostingLimit"><see cref="ElasticsearchSinkOptions.BatchPostingLimit"/></param>
/// <param name="period"><see cref="ElasticsearchSinkOptions.Period"/></param>
/// <param name="inlineFields"><see cref="ElasticsearchSinkOptions.InlineFields"/></param>
/// <param name="restrictedToMinimumLevel">The minimum log event level required in order to write an event to the sink. Ignored when <paramref name="levelSwitch"/> is specified.</param>
/// <param name="levelSwitch">A switch allowing the pass-through minimum level to be changed at runtime.</param>
/// <param name="bufferBaseFilename"><see cref="ElasticsearchSinkOptions.BufferBaseFilename"/></param>
/// <param name="bufferFileSizeLimitBytes"><see cref="ElasticsearchSinkOptions.BufferFileSizeLimitBytes"/></param>
/// <param name="bufferLogShippingInterval"><see cref="ElasticsearchSinkOptions.BufferLogShippingInterval"/></param>
/// <param name="connectionGlobalHeaders">A comma or semi column separated list of key value pairs of headers to be added to each elastic http request</param>
/// <param name="connectionTimeout"><see cref="ElasticsearchSinkOptions.ConnectionTimeout"/>The connection timeout (in seconds) when sending bulk operations to elasticsearch (defaults to 5).</param>
/// <param name="emitEventFailure"><see cref="ElasticsearchSinkOptions.EmitEventFailure"/>Specifies how failing emits should be handled.</param>
/// <param name="queueSizeLimit"><see cref="ElasticsearchSinkOptions.QueueSizeLimit"/>The maximum number of events that will be held in-memory while waiting to ship them to Elasticsearch. Beyond this limit, events will be dropped. The default is 100,000. Has no effect on durable log shipping.</param>
/// <param name="pipelineName"><see cref="ElasticsearchSinkOptions.PipelineName"/>Name the Pipeline where log events are sent to sink. Please note that the Pipeline should be existing before the usage starts.</param>
/// <param name="autoRegisterTemplate"><see cref="ElasticsearchSinkOptions.AutoRegisterTemplate"/>When set to true the sink will register an index template for the logs in elasticsearch.</param>
/// <param name="autoRegisterTemplateVersion"><see cref="ElasticsearchSinkOptions.AutoRegisterTemplateVersion"/>When using the AutoRegisterTemplate feature, this allows to set the Elasticsearch version. Depending on the version, a template will be selected. Defaults to pre 5.0.</param>
/// <param name="overwriteTemplate"><see cref="ElasticsearchSinkOptions.OverwriteTemplate"/>When using the AutoRegisterTemplate feature, this allows you to overwrite the template in Elasticsearch if it already exists. Defaults to false</param>
/// <param name="registerTemplateFailure"><see cref="ElasticsearchSinkOptions.RegisterTemplateFailure"/>Specifies the option on how to handle failures when writing the template to Elasticsearch. This is only applicable when using the AutoRegisterTemplate option.</param>
/// <param name="deadLetterIndexName"><see cref="ElasticsearchSinkOptions.DeadLetterIndexName"/>Optionally set this value to the name of the index that should be used when the template cannot be written to ES.</param>
/// <param name="numberOfShards"><see cref="ElasticsearchSinkOptions.NumberOfShards"/>The default number of shards.</param>
/// <param name="numberOfReplicas"><see cref="ElasticsearchSinkOptions.NumberOfReplicas"/>The default number of replicas.</param>
/// <returns>LoggerConfiguration object</returns>
/// <exception cref="ArgumentNullException"><paramref name="nodeUris"/> is <see langword="null" />.</exception>
public static LoggerConfiguration Elasticsearch(
this LoggerSinkConfiguration loggerSinkConfiguration,
string nodeUris,
string indexFormat = null,
string templateName = null,
string typeName = "logevent",
int batchPostingLimit = 50,
int period = 2,
bool inlineFields = false,
LogEventLevel restrictedToMinimumLevel = LevelAlias.Minimum,
string bufferBaseFilename = null,
long? bufferFileSizeLimitBytes = null,
long bufferLogShippingInterval = 5000,
string connectionGlobalHeaders = null,
LoggingLevelSwitch levelSwitch = null,
int connectionTimeout = 5,
EmitEventFailureHandling emitEventFailure = EmitEventFailureHandling.WriteToSelfLog,
int queueSizeLimit = 100000,
string pipelineName = null,
bool autoRegisterTemplate = false,
AutoRegisterTemplateVersion autoRegisterTemplateVersion = AutoRegisterTemplateVersion.ESv2,
bool overwriteTemplate = false,
RegisterTemplateRecovery registerTemplateFailure = RegisterTemplateRecovery.IndexAnyway,
string deadLetterIndexName = null,
int? numberOfShards = null,
int? numberOfReplicas = null)
{
if (string.IsNullOrEmpty(nodeUris))
throw new ArgumentNullException(nameof(nodeUris), "No Elasticsearch node(s) specified.");

IEnumerable<Uri> nodes = nodeUris
.Split(new[] { ',', ';' }, StringSplitOptions.RemoveEmptyEntries)
.Select(uriString => new Uri(uriString));

var options = new ElasticsearchSinkOptions(nodes);

if (!string.IsNullOrWhiteSpace(indexFormat))
{
options.IndexFormat = indexFormat;
}

if (!string.IsNullOrWhiteSpace(templateName))
{
options.AutoRegisterTemplate = true;
options.TemplateName = templateName;
}

if (!string.IsNullOrWhiteSpace(typeName))
{
options.TypeName = typeName;
}

options.BatchPostingLimit = batchPostingLimit;
options.Period = TimeSpan.FromSeconds(period);
options.InlineFields = inlineFields;
options.MinimumLogEventLevel = restrictedToMinimumLevel;
options.LevelSwitch = levelSwitch;

if (!string.IsNullOrWhiteSpace(bufferBaseFilename))
{
Path.GetFullPath(bufferBaseFilename); // validate path
options.BufferBaseFilename = bufferBaseFilename;
}

if (bufferFileSizeLimitBytes.HasValue)
{
options.BufferFileSizeLimitBytes = bufferFileSizeLimitBytes.Value;
}

options.BufferLogShippingInterval = TimeSpan.FromMilliseconds(bufferLogShippingInterval);

if (!string.IsNullOrWhiteSpace(connectionGlobalHeaders))
{
NameValueCollection headers = new NameValueCollection();
connectionGlobalHeaders
.Split(new[] { ',', ';' }, StringSplitOptions.RemoveEmptyEntries)
.ToList()
.ForEach(headerValueStr =>
{
var headerValue = headerValueStr.Split(new[] { '=' }, 2, StringSplitOptions.RemoveEmptyEntries);
headers.Add(headerValue[0], headerValue[1]);
});

options.ModifyConnectionSettings = (c) => c.GlobalHeaders(headers);
}

options.ConnectionTimeout = TimeSpan.FromSeconds(connectionTimeout);
options.EmitEventFailure = emitEventFailure;
options.QueueSizeLimit = queueSizeLimit;
options.PipelineName = pipelineName;

options.AutoRegisterTemplate = autoRegisterTemplate;
options.AutoRegisterTemplateVersion = autoRegisterTemplateVersion;
options.RegisterTemplateFailure = registerTemplateFailure;
options.OverwriteTemplate = overwriteTemplate;
options.NumberOfShards = numberOfShards;
options.NumberOfReplicas = numberOfReplicas;

if (!string.IsNullOrWhiteSpace(deadLetterIndexName))
{
options.DeadLetterIndexName = deadLetterIndexName;
}

return Elasticsearch(loggerSinkConfiguration, options);
}

/// <summary>
/// Overload to allow basic configuration through AppSettings.
/// </summary>
Expand Down Expand Up @@ -275,6 +139,7 @@ public static LoggerConfiguration Elasticsearch(
/// <param name="customFormatter">Customizes the formatter used when converting log events into ElasticSearch documents. Please note that the formatter output must be valid JSON :)</param>
/// <param name="customDurableFormatter">Customizes the formatter used when converting log events into the durable sink. Please note that the formatter output must be valid JSON :)</param>
/// <param name="failureSink">Sink to use when Elasticsearch is unable to accept the events. This is optionally and depends on the EmitEventFailure setting.</param>
/// <param name="singleEventSizePostingLimit"><see cref="ElasticsearchSinkOptions.SingleEventSizePostingLimit"/>The maximum length of an event allowed to be posted to Elasticsearch.</param>
/// <returns>LoggerConfiguration object</returns>
/// <exception cref="ArgumentNullException"><paramref name="nodeUris"/> is <see langword="null" />.</exception>
public static LoggerConfiguration Elasticsearch(
Expand Down Expand Up @@ -309,7 +174,8 @@ public static LoggerConfiguration Elasticsearch(
IConnectionPool connectionPool = null,
ITextFormatter customFormatter = null,
ITextFormatter customDurableFormatter = null,
ILogEventSink failureSink = null)
ILogEventSink failureSink = null,
int singleEventSizePostingLimit = 0)
{
if (string.IsNullOrEmpty(nodeUris))
throw new ArgumentNullException(nameof(nodeUris), "No Elasticsearch node(s) specified.");
Expand Down Expand Up @@ -337,6 +203,7 @@ public static LoggerConfiguration Elasticsearch(
}

options.BatchPostingLimit = batchPostingLimit;
options.SingleEventSizePostingLimit = singleEventSizePostingLimit;
options.Period = TimeSpan.FromSeconds(period);
options.InlineFields = inlineFields;
options.MinimumLogEventLevel = restrictedToMinimumLevel;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ internal class ElasticsearchLogShipper : IDisposable
private readonly ElasticsearchSinkState _state;

readonly int _batchPostingLimit;
readonly int _singleEventSizePostingLimit;
#if NO_TIMER
readonly PortableTimer _timer;
#else
Expand All @@ -49,13 +50,21 @@ internal class ElasticsearchLogShipper : IDisposable

bool _didRegisterTemplateIfNeeded;

internal ElasticsearchLogShipper(ElasticsearchSinkOptions option)
{
_batchPostingLimit = option.BatchPostingLimit;
_singleEventSizePostingLimit = option.SingleEventSizePostingLimit;
_state = ElasticsearchSinkState.Create(option);
}

internal ElasticsearchLogShipper(ElasticsearchSinkState state)
{

_state = state;
_connectionSchedule = new ExponentialBackoffConnectionSchedule(_state.Options.BufferLogShippingInterval ?? TimeSpan.FromSeconds(5));

_batchPostingLimit = _state.Options.BatchPostingLimit;
_singleEventSizePostingLimit = _state.Options.SingleEventSizePostingLimit;
_bookmarkFilename = Path.GetFullPath(_state.Options.BufferBaseFilename + ".bookmark");
_logFolder = Path.GetDirectoryName(_bookmarkFilename);
_candidateSearchPath = Path.GetFileName(_state.Options.BufferBaseFilename) + "*.json";
Expand Down Expand Up @@ -133,7 +142,7 @@ void OnTick()
_didRegisterTemplateIfNeeded = true;
}

var count = 0;
var hasData = false;

do
{
Expand All @@ -142,23 +151,18 @@ void OnTick()

using (var bookmark = System.IO.File.Open(_bookmarkFilename, FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.Read))
{
long nextLineBeginsAtOffset;
string currentFilePath;

TryReadBookmark(bookmark, out nextLineBeginsAtOffset, out currentFilePath);

TryReadBookmark(bookmark, out long currentLineBeginsAtOffset, out string currentFilePath);
var fileSet = GetFileSet();

if (currentFilePath == null || !System.IO.File.Exists(currentFilePath))
{
nextLineBeginsAtOffset = 0;
currentLineBeginsAtOffset = 0;
currentFilePath = fileSet.FirstOrDefault();
}

if (currentFilePath == null) continue;

count = 0;

hasData = false;
// file name pattern: whatever-bla-bla-20150218.json, whatever-bla-bla-20150218_1.json, etc.
var lastToken = currentFilePath.Split('-').Last();

Expand All @@ -173,33 +177,23 @@ void OnTick()
var indexName = _state.GetIndexForEvent(null, date);

var payload = new List<string>();
var nextLineBeginsAtOffset = currentLineBeginsAtOffset;

using (var current = System.IO.File.Open(currentFilePath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
current.Position = nextLineBeginsAtOffset;

string nextLine;
while (count < _batchPostingLimit && TryReadLine(current, ref nextLineBeginsAtOffset, out nextLine))
{
var action = default(object);

if (string.IsNullOrWhiteSpace(_state.Options.PipelineName))
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName, _id = count + "_" + Guid.NewGuid() } };
}
else
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName, _id = count + "_" + Guid.NewGuid(), pipeline = _state.Options.PipelineName } };
}
var actionJson = _state.Serialize(action);
payload.Add(actionJson);
payload.Add(nextLine);
++count;
}
nextLineBeginsAtOffset = CreatePayLoad(current, payload, indexName, currentLineBeginsAtOffset, currentFilePath);
}

if (count > 0)
if (nextLineBeginsAtOffset > currentLineBeginsAtOffset)
{
hasData = true;

if (!payload.Any())
{
// Nothing to send, all events have been skipped, just write next offset to the bookmark
WriteBookmark(bookmark, nextLineBeginsAtOffset, currentFilePath);
continue;
}

var response = _state.Client.Bulk<DynamicResponse>(PostData.MultiJson(payload));

Expand Down Expand Up @@ -242,7 +236,7 @@ void OnTick()
}
}
}
while (count == _batchPostingLimit);
while (hasData);
}
catch (Exception ex)
{
Expand All @@ -261,6 +255,58 @@ void OnTick()
}
}

private long CreatePayLoad(
Stream current,
List<string> payload,
string indexName,
long currentLineBeginsAtOffset,
string currentFilePath)
{
current.Position = currentLineBeginsAtOffset;

var count = 0;
string nextLine;
var currentPosition = current.Position;
var nextPosition = currentPosition;
while (count < _batchPostingLimit && TryReadLine(current, ref nextPosition, out nextLine))
{
if (_singleEventSizePostingLimit > 0)
{
if (nextLine.Length > _singleEventSizePostingLimit)
{
SelfLog.WriteLine(
"File: {0}. Skip sending to ElasticSearch event at position offset {1}. Reason: {2}.",
currentFilePath,
currentPosition,
$"Event has line length {nextLine.Length} over limit of {_singleEventSizePostingLimit}");

currentPosition = nextPosition;
continue;
}

currentPosition = nextPosition;
}

var action = default(object);

if (string.IsNullOrWhiteSpace(_state.Options.PipelineName))
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName, _id = count + "_" + Guid.NewGuid() } };
}
else
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName, _id = count + "_" + Guid.NewGuid(), pipeline = _state.Options.PipelineName } };
}

var actionJson = _state.Serialize(action);
payload.Add(actionJson);
payload.Add(nextLine);
++count;
}

return nextPosition;
}

static void HandleBulkResponse(DynamicResponse response, List<string> payload)
{
int i = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ public class ElasticsearchSinkOptions
/// </summary>
public int BatchPostingLimit { get; set; }

///<summary>
/// The maximum length of a an event record to be sent. Defaults to: 0 (No Limit)
/// </summary>
public int SingleEventSizePostingLimit { get; set; }

///<summary>
/// The time to wait between checking for event batches. Defaults to 2 seconds.
/// </summary>
Expand Down Expand Up @@ -230,6 +235,7 @@ public ElasticsearchSinkOptions()
this.TypeName = "logevent";
this.Period = TimeSpan.FromSeconds(2);
this.BatchPostingLimit = 50;
this.SingleEventSizePostingLimit = 0;
this.TemplateName = "serilog-events-template";
this.ConnectionTimeout = TimeSpan.FromSeconds(5);
this.EmitEventFailure = EmitEventFailureHandling.WriteToSelfLog;
Expand Down
Loading