Skip to content
This repository has been archived by the owner on Jun 1, 2024. It is now read-only.

Added support for controlling the op_type when indexing #356

Merged
merged 5 commits into from
Sep 18, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
## Changelog

* Disable dot-escaping for field names, because ELK already supports dots in field names.
* Support for explicitly setting `Options.TypeName` to `null` this will remove the
deprecated `_type` from the bulk payload being sent to Elastic. Earlier an exception was
thrown if the `Options.TypeName` was `null`. _If you're using `AutoRegisterTemplateVersion.ESv7`
we'll not force the type to `_doc` if it's set to `null`. This is a small step towards support
for writing logs to Elastic v8. #345
* Support for setting the Elastic `op_type` e.g. `index` or `create` for bulk actions.
This is a requirement for writing to [data streams](https://www.elastic.co/guide/en/elasticsearch/reference/7.9/data-streams.html)
that's only supporting `create`. Data streams is a more slipped stream way to handle rolling
indices, that previous required an ILM, template and a magic write alias. Now it's more integrated
in Elasticsearch and Kibana. If you're running Elastic `7.9` you'll get rolling indices out of the box
with this configuration:
```
TypeName = null,
IndexFormat = "logs-my-stream",
BatchAction = ElasticOpType.Create,
```
_Note: that current templates doesn't support data streams._ #355

8.2
* Allow the use of templateCustomSettings when reading from settings json (#315)
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ This example shows the options that are currently available when using the appSe
<add key="serilog:write-to:Elasticsearch.typeName" value="myCustomLogEventType"/>
<add key="serilog:write-to:Elasticsearch.pipelineName" value="myCustomPipelineName"/>
<add key="serilog:write-to:Elasticsearch.batchPostingLimit" value="50"/>
<add key="serilog:write-to:Elasticsearch.batchAction" value="create"/>
<add key="serilog:write-to:Elasticsearch.period" value="2"/>
<add key="serilog:write-to:Elasticsearch.inlineFields" value="true"/>
<add key="serilog:write-to:Elasticsearch.restrictedToMinimumLevel" value="Warning"/>
Expand Down Expand Up @@ -179,6 +180,7 @@ In your `appsettings.json` file, under the `Serilog` node, :
"typeName": "myCustomLogEventType",
"pipelineName": "myCustomPipelineName",
"batchPostingLimit": 50,
"batchAction": "create",
"period": 2,
"inlineFields": true,
"restrictedToMinimumLevel": "Warning",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public static LoggerConfiguration Elasticsearch(
/// <param name="failureSink">Sink to use when Elasticsearch is unable to accept the events. This is optionally and depends on the EmitEventFailure setting.</param>
/// <param name="singleEventSizePostingLimit"><see cref="ElasticsearchSinkOptions.SingleEventSizePostingLimit"/>The maximum length of an event allowed to be posted to Elasticsearch.default null</param>
/// <param name="templateCustomSettings">Add custom elasticsearch settings to the template</param>
/// <param name="batchAction">Configures the OpType being used when inserting document in batch. Must be set to create for data streams.</param>
orjan marked this conversation as resolved.
Show resolved Hide resolved
/// <returns>LoggerConfiguration object</returns>
/// <exception cref="ArgumentNullException"><paramref name="nodeUris"/> is <see langword="null" />.</exception>
public static LoggerConfiguration Elasticsearch(
Expand Down Expand Up @@ -180,7 +181,8 @@ public static LoggerConfiguration Elasticsearch(
ILogEventSink failureSink = null,
long? singleEventSizePostingLimit = null,
int? bufferFileCountLimit = null,
Dictionary<string,string> templateCustomSettings = null)
Dictionary<string,string> templateCustomSettings = null,
ElasticOpType batchAction = ElasticOpType.Index)
orjan marked this conversation as resolved.
Show resolved Hide resolved
{
if (string.IsNullOrEmpty(nodeUris))
throw new ArgumentNullException(nameof(nodeUris), "No Elasticsearch node(s) specified.");
Expand Down Expand Up @@ -208,6 +210,7 @@ public static LoggerConfiguration Elasticsearch(
}

options.BatchPostingLimit = batchPostingLimit;
options.BatchAction = batchAction;
options.SingleEventSizePostingLimit = singleEventSizePostingLimit;
options.Period = TimeSpan.FromSeconds(period);
options.InlineFields = inlineFields;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
using System;
using System.Collections.Generic;
using System.Text;
using Elasticsearch.Net;
using Serilog.Core;
using Serilog.Events;

Expand Down Expand Up @@ -55,15 +54,16 @@ public DurableElasticsearchSink(ElasticsearchSinkOptions options)


var elasticSearchLogClient = new ElasticsearchLogClient(
elasticLowLevelClient: _state.Client,
cleanPayload: _state.Options.BufferCleanPayload);
elasticLowLevelClient: _state.Client,
cleanPayload: _state.Options.BufferCleanPayload,
elasticOpType: _state.Options.BatchAction);

var payloadReader = new ElasticsearchPayloadReader(
pipelineName: _state.Options.PipelineName,
typeName:_state.Options.TypeName,
serialize:_state.Serialize,
getIndexForEvent: _state.GetBufferedIndexForEvent
);
getIndexForEvent: _state.GetBufferedIndexForEvent,
elasticOpType: _state.Options.BatchAction);

_shipper = new ElasticsearchLogShipper(
bufferBaseFilename: _state.Options.BufferBaseFilename,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.ExceptionServices;
using System.Text;
using System.Text.RegularExpressions;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Serilog.Debugging;
Expand All @@ -17,17 +14,21 @@ public class ElasticsearchLogClient : ILogClient<List<string>>
{
private readonly IElasticLowLevelClient _elasticLowLevelClient;
private readonly Func<string, long?, string, string> _cleanPayload;
private readonly ElasticOpType _elasticOpType;

/// <summary>
///
/// </summary>
/// <param name="elasticLowLevelClient"></param>
/// <param name="cleanPayload"></param>
/// <param name="elasticOpType"></param>
public ElasticsearchLogClient(IElasticLowLevelClient elasticLowLevelClient,
Func<string, long?, string, string> cleanPayload)
Func<string, long?, string, string> cleanPayload,
ElasticOpType elasticOpType)
{
_elasticLowLevelClient = elasticLowLevelClient;
_cleanPayload = cleanPayload;
_elasticOpType = elasticOpType;
}

public async Task<SentPayloadResult> SendPayloadAsync(List<string> payload)
Expand Down Expand Up @@ -85,7 +86,7 @@ private InvalidResult GetInvalidPayloadAsync(DynamicResponse baseResult, List<st
bool hasErrors = false;
foreach (dynamic item in items)
{
var itemIndex = item?["index"];
var itemIndex = item?[ElasticsearchSink.BulkAction(_elasticOpType)];
long? status = itemIndex?["status"];
i++;
if (!status.HasValue || status < 300)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
using System.Globalization;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Elasticsearch.Net;

namespace Serilog.Sinks.Elasticsearch.Durable
{
Expand All @@ -17,6 +16,7 @@ public class ElasticsearchPayloadReader: APayloadReader<List<string>>
private readonly string _typeName;
private readonly Func<object, string> _serialize;
private readonly Func<string, DateTime,string> _getIndexForEvent;
private readonly ElasticOpType _elasticOpType;
private List<string> _payload;
private int _count;
private DateTime _date;
Expand All @@ -28,12 +28,15 @@ public class ElasticsearchPayloadReader: APayloadReader<List<string>>
/// <param name="typeName"></param>
/// <param name="serialize"></param>
/// <param name="getIndexForEvent"></param>
public ElasticsearchPayloadReader(string pipelineName,string typeName, Func<object,string> serialize,Func<string,DateTime,string> getIndexForEvent)
/// <param name="elasticOpType"></param>
public ElasticsearchPayloadReader(string pipelineName, string typeName, Func<object, string> serialize,
Func<string, DateTime, string> getIndexForEvent, ElasticOpType elasticOpType)
{
_pipelineName = pipelineName;
_typeName = typeName;
_serialize = serialize;
_getIndexForEvent = getIndexForEvent;
_elasticOpType = elasticOpType;
}

/// <summary>
Expand Down Expand Up @@ -80,18 +83,13 @@ protected override List<string> FinishPayLoad()
protected override void AddToPayLoad(string nextLine)
{
var indexName = _getIndexForEvent(nextLine, _date);
var action = default(object);
var action = ElasticsearchSink.CreateElasticAction(
opType: _elasticOpType,
indexName: indexName, pipelineName: _pipelineName,
id: _count + "_" + Guid.NewGuid(),
mappingType: _typeName);
var actionJson = LowLevelRequestResponseSerializer.Instance.SerializeToString(action);

if (string.IsNullOrWhiteSpace(_pipelineName))
{
action = new { index = new { _index = indexName, _type = _typeName, _id = _count + "_" + Guid.NewGuid() } };
}
else
{
action = new { index = new { _index = indexName, _type = _typeName, _id = _count + "_" + Guid.NewGuid(), pipeline = _pipelineName } };
}

var actionJson = _serialize(action);
_payload.Add(actionJson);
_payload.Add(nextLine);
_count++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Runtime.Serialization;
using System.Threading.Tasks;
using Elasticsearch.Net;
using Elasticsearch.Net.Specification.SecurityApi;
using Serilog.Debugging;
using Serilog.Events;
using Serilog.Sinks.PeriodicBatching;
Expand Down Expand Up @@ -82,7 +82,7 @@ protected override async Task EmitBatchAsync(IEnumerable<LogEvent> events)
if (events == null || !events.Any())
return Task.FromResult<T>(default(T));

var payload = CreatePlayLoad<T>(events);
var payload = CreatePlayLoad(events);
return _state.Client.BulkAsync<T>(PostData.MultiJson(payload));
}

Expand All @@ -97,7 +97,7 @@ protected override async Task EmitBatchAsync(IEnumerable<LogEvent> events)
if (events == null || !events.Any())
return null;

var payload = CreatePlayLoad<T>(events);
var payload = CreatePlayLoad(events);
return _state.Client.Bulk<T>(PostData.MultiJson(payload));
}

Expand Down Expand Up @@ -165,8 +165,7 @@ private static bool HasProperty(dynamic settings, string name)
return settings.GetType().GetProperty(name) != null;
}

private IEnumerable<string> CreatePlayLoad<T>(IEnumerable<LogEvent> events)
where T : class, IElasticsearchResponse, new()
private IEnumerable<string> CreatePlayLoad(IEnumerable<LogEvent> events)
{
if (!_state.TemplateRegistrationSuccess && _state.Options.RegisterTemplateFailure == RegisterTemplateRecovery.FailSink)
{
Expand All @@ -177,19 +176,15 @@ private IEnumerable<string> CreatePlayLoad<T>(IEnumerable<LogEvent> events)
foreach (var e in events)
{
var indexName = _state.GetIndexForEvent(e, e.Timestamp.ToUniversalTime());
var action = default(object);

var pipelineName = _state.Options.PipelineNameDecider?.Invoke(e) ?? _state.Options.PipelineName;
if (string.IsNullOrWhiteSpace(pipelineName))
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName } };
}
else
{
action = new { index = new { _index = indexName, _type = _state.Options.TypeName, pipeline = pipelineName } };
}
var actionJson = _state.Serialize(action);
payload.Add(actionJson);

var action = CreateElasticAction(
opType: _state.Options.BatchAction,
indexName: indexName,
pipelineName: pipelineName,
mappingType: _state.Options.TypeName);
payload.Add(LowLevelRequestResponseSerializer.Instance.SerializeToString(action));

var sw = new StringWriter();
_state.Formatter.Format(e, sw);
payload.Add(sw.ToString());
Expand All @@ -204,10 +199,15 @@ private void HandleResponse(IEnumerable<LogEvent> events, DynamicResponse result
if (result.Success && result.Body?["errors"] == true)
{
var indexer = 0;
var opType = BulkAction(_state.Options.BatchAction);
var items = result.Body["items"];
foreach (var item in items)
{
if (item["index"] != null && HasProperty(item["index"], "error") && item["index"]["error"] != null)
var action = item.ContainsKey(opType)
? item[opType]
: null;

if (action != null && action.ContainsKey("error"))
{
var e = events.ElementAt(indexer);
if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.WriteToSelfLog))
Expand All @@ -216,8 +216,8 @@ private void HandleResponse(IEnumerable<LogEvent> events, DynamicResponse result
SelfLog.WriteLine(
"Failed to store event with template '{0}' into Elasticsearch. Elasticsearch reports for index {1} the following: {2}",
e.MessageTemplate,
item["index"]["_index"],
_state.Serialize(item["index"]["error"]));
action["_index"],
_state.Serialize(action["error"]));
}

if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.WriteToFailureSink) &&
Expand Down Expand Up @@ -251,7 +251,6 @@ private void HandleResponse(IEnumerable<LogEvent> events, DynamicResponse result
_state.Options.FailureCallback);
}
}

}
indexer++;
}
Expand All @@ -261,5 +260,69 @@ private void HandleResponse(IEnumerable<LogEvent> events, DynamicResponse result
HandleException(result.OriginalException, events);
}
}

internal static string BulkAction(ElasticOpType elasticOpType) =>
elasticOpType == ElasticOpType.Create
? "create"
: "index";

internal static object CreateElasticAction(ElasticOpType opType, string indexName, string pipelineName = null, string id = null, string mappingType = null)
{
var actionPayload = new ElasticActionPayload(
indexName: indexName,
pipeline: string.IsNullOrWhiteSpace(pipelineName) ? null : pipelineName,
id: id,
mappingType: mappingType
);

var action = opType == ElasticOpType.Create
? (object) new ElasticCreateAction(actionPayload)
: new ElasticIndexAction(actionPayload);
return action;
}

sealed class ElasticCreateAction
orjan marked this conversation as resolved.
Show resolved Hide resolved
{
public ElasticCreateAction(ElasticActionPayload payload)
{
Payload = payload;
}

[DataMember(Name = "create")]
public ElasticActionPayload Payload { get; }
}

sealed class ElasticIndexAction
{
public ElasticIndexAction(ElasticActionPayload payload)
{
Payload = payload;
}

[DataMember(Name = "index")]
public ElasticActionPayload Payload { get; }
}

sealed class ElasticActionPayload {
public ElasticActionPayload(string indexName, string pipeline = null, string id = null, string mappingType = null)
{
IndexName = indexName;
Pipeline = pipeline;
Id = id;
MappingType = mappingType;
}

[DataMember(Name = "_type")]
public string MappingType { get; }

[DataMember(Name = "_index")]
public string IndexName { get; }

[DataMember(Name = "pipeline")]
public string Pipeline { get; }

[DataMember(Name = "_id")]
public string Id { get; }
}
}
}
Loading