From c9f93505165fd36c717913f8eeaefc27cd5daf00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96rjan=20Sj=C3=B6holm?= Date: Sun, 13 Sep 2020 20:26:08 +0200 Subject: [PATCH 1/5] Added support for controlling the op_type when indexing In order to write to data streams we'll need to set the op_type to create the default is index In order to don't serialize null values for the batch action and to not depend on custom serialization setting we're using LowLevelRequestResponseSerializer.Instance for serialization of the action. We won't throw exception when setting the TypeName to null since: - This is deprecated in v7 - Won't work in v8 --- ...gerConfigurationElasticSearchExtensions.cs | 5 +- .../Sinks/ElasticSearch/ElasticSearchSink.cs | 66 +++++++++++--- .../ElasticSearch/ElasticsearchSinkOptions.cs | 28 ++++++ .../ElasticSearch/ElasticsearchSinkState.cs | 4 - .../BulkActionTests.cs | 90 +++++++++++++++++++ .../ElasticsearchSinkTestsBase.cs | 4 +- 6 files changed, 177 insertions(+), 20 deletions(-) create mode 100644 test/Serilog.Sinks.Elasticsearch.Tests/BulkActionTests.cs diff --git a/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs b/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs index 7cbf6ca5..7e3ec182 100644 --- a/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs +++ b/src/Serilog.Sinks.Elasticsearch/LoggerConfigurationElasticSearchExtensions.cs @@ -143,6 +143,7 @@ public static LoggerConfiguration Elasticsearch( /// Sink to use when Elasticsearch is unable to accept the events. This is optionally and depends on the EmitEventFailure setting. /// The maximum length of an event allowed to be posted to Elasticsearch.default null /// Add custom elasticsearch settings to the template + /// Configures the OpType being used when inserting document in batch. Must be set to create for data streams. /// LoggerConfiguration object /// is . public static LoggerConfiguration Elasticsearch( @@ -180,7 +181,8 @@ public static LoggerConfiguration Elasticsearch( ILogEventSink failureSink = null, long? singleEventSizePostingLimit = null, int? bufferFileCountLimit = null, - Dictionary templateCustomSettings = null) + Dictionary templateCustomSettings = null, + ElasticOpType batchAction = ElasticOpType.Index) { if (string.IsNullOrEmpty(nodeUris)) throw new ArgumentNullException(nameof(nodeUris), "No Elasticsearch node(s) specified."); @@ -208,6 +210,7 @@ public static LoggerConfiguration Elasticsearch( } options.BatchPostingLimit = batchPostingLimit; + options.BatchAction = batchAction; options.SingleEventSizePostingLimit = singleEventSizePostingLimit; options.Period = TimeSpan.FromSeconds(period); options.InlineFields = inlineFields; diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs index a49bfe9f..1487cb37 100644 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs @@ -16,9 +16,9 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Runtime.Serialization; using System.Threading.Tasks; using Elasticsearch.Net; -using Elasticsearch.Net.Specification.SecurityApi; using Serilog.Debugging; using Serilog.Events; using Serilog.Sinks.PeriodicBatching; @@ -177,19 +177,19 @@ private IEnumerable CreatePlayLoad(IEnumerable events) foreach (var e in events) { var indexName = _state.GetIndexForEvent(e, e.Timestamp.ToUniversalTime()); - var action = default(object); - var pipelineName = _state.Options.PipelineNameDecider?.Invoke(e) ?? _state.Options.PipelineName; - if (string.IsNullOrWhiteSpace(pipelineName)) - { - action = new { index = new { _index = indexName, _type = _state.Options.TypeName } }; - } - else - { - action = new { index = new { _index = indexName, _type = _state.Options.TypeName, pipeline = pipelineName } }; - } - var actionJson = _state.Serialize(action); - payload.Add(actionJson); + + var actionPayload = new ElasticActionPayload( + indexName, + string.IsNullOrWhiteSpace(pipelineName) ? null : pipelineName, + _state.Options.TypeName + ); + + var action = _state.Options.BatchAction == ElasticOpType.Create + ? (object) new ElasticCreateAction(actionPayload) + : new ElasticIndexAction(actionPayload); + payload.Add(LowLevelRequestResponseSerializer.Instance.SerializeToString(action)); + var sw = new StringWriter(); _state.Formatter.Format(e, sw); payload.Add(sw.ToString()); @@ -261,5 +261,45 @@ private void HandleResponse(IEnumerable events, DynamicResponse result HandleException(result.OriginalException, events); } } + + sealed class ElasticCreateAction + { + public ElasticCreateAction(ElasticActionPayload payload) + { + Payload = payload; + } + + [DataMember(Name = "create")] + public ElasticActionPayload Payload { get; } + } + + sealed class ElasticIndexAction + { + public ElasticIndexAction(ElasticActionPayload payload) + { + Payload = payload; + } + + [DataMember(Name = "index")] + public ElasticActionPayload Payload { get; } + } + + sealed class ElasticActionPayload { + public ElasticActionPayload(string indexName, string pipeline = null, string mappingType = null) + { + IndexName = indexName; + Pipeline = pipeline; + MappingType = mappingType; + } + + [DataMember(Name = "_type")] + public string MappingType { get; } + + [DataMember(Name = "_index")] + public string IndexName { get; } + + [DataMember(Name = "pipeline")] + public string Pipeline { get; } + } } } diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkOptions.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkOptions.cs index 656cc8a5..06de4ae2 100644 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkOptions.cs +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkOptions.cs @@ -15,6 +15,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Runtime.Serialization; using Elasticsearch.Net; using Serilog.Core; using Serilog.Events; @@ -113,6 +114,12 @@ public class ElasticsearchSinkOptions /// public string TypeName { get; set; } + /// + /// Configures the being used when bulk indexing documents. + /// In order to use data streams, this needs to be set to OpType.Create. + /// + public ElasticOpType BatchAction { get; set; } = ElasticOpType.Index; + /// /// Function to decide which Pipeline to use for the LogEvent /// @@ -403,4 +410,25 @@ public enum RegisterTemplateRecovery /// FailSink = 8 } + + /// + /// Collection of op_type:s that can be used when indexing documents + /// ‹https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html + /// + /// This is the same as the internal but we don't want to + /// expose it in the API. + /// + public enum ElasticOpType + { + /// + /// Default option, creates or updates a document. + /// + [EnumMember(Value = "index")] Index, + /// + /// Set to create to only index the document if it does not already exist (put if absent). + /// - If a document with the specified _id already exists, the indexing operation will fail. + /// - If the request targets a data stream, an op_type of create is required + /// + [EnumMember(Value = "create")] Create + } } diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs index 79f64e23..a605ea0b 100644 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs @@ -74,10 +74,6 @@ private ElasticsearchSinkState(ElasticsearchSinkOptions options) { options.TypeName = "_doc"; } - else - { - if (string.IsNullOrWhiteSpace(options.TypeName)) throw new ArgumentException("options.TypeName"); - } _templateName = options.TemplateName; _templateMatchString = IndexFormatRegex.Replace(options.IndexFormat, @"$1*$2"); diff --git a/test/Serilog.Sinks.Elasticsearch.Tests/BulkActionTests.cs b/test/Serilog.Sinks.Elasticsearch.Tests/BulkActionTests.cs new file mode 100644 index 00000000..b7c0c927 --- /dev/null +++ b/test/Serilog.Sinks.Elasticsearch.Tests/BulkActionTests.cs @@ -0,0 +1,90 @@ +using System; +using System.Linq; +using FluentAssertions; +using Serilog.Events; +using Serilog.Parsing; +using Xunit; + +namespace Serilog.Sinks.Elasticsearch.Tests +{ + public class BulkActionTests : ElasticsearchSinkTestsBase + { + [Fact] + public void DefaultBulkActionV7() + { + _options.IndexFormat = "logs"; + _options.TypeName = "_doc"; + _options.PipelineName = null; + using (var sink = new ElasticsearchSink(_options)) + { + sink.Emit(ADummyLogEvent()); + sink.Emit(ADummyLogEvent()); + } + + var bulkJsonPieces = this.AssertSeenHttpPosts(_seenHttpPosts, 2, 1); + const string expectedAction = @"{""index"":{""_type"":""_doc"",""_index"":""logs""}}"; + bulkJsonPieces[0].Should().Be(expectedAction); + } + + [Fact] + public void DefaultBulkActionV8() + { + _options.IndexFormat = "logs"; + _options.TypeName = null; + _options.PipelineName = null; + using (var sink = new ElasticsearchSink(_options)) + { + sink.Emit(ADummyLogEvent()); + sink.Emit(ADummyLogEvent()); + } + + var bulkJsonPieces = this.AssertSeenHttpPosts(_seenHttpPosts, 2, 1); + const string expectedAction = @"{""index"":{""_index"":""logs""}}"; + bulkJsonPieces[0].Should().Be(expectedAction); + } + + [Fact] + public void BulkActionDataStreams() + { + _options.IndexFormat = "logs-my-stream"; + _options.TypeName = null; + _options.PipelineName = null; + _options.BatchAction = ElasticOpType.Create; + + using (var sink = new ElasticsearchSink(_options)) + { + sink.Emit(ADummyLogEvent()); + sink.Emit(ADummyLogEvent()); + } + + var bulkJsonPieces = this.AssertSeenHttpPosts(_seenHttpPosts, 2, 1); + const string expectedAction = @"{""create"":{""_index"":""logs-my-stream""}}"; + bulkJsonPieces[0].Should().Be(expectedAction); + } + + [Fact] + public void PipelineAction() + { + _options.IndexFormat = "logs-my-stream"; + _options.TypeName = "_doc"; + _options.PipelineName = "my-pipeline"; + _options.BatchAction = ElasticOpType.Index; + + using (var sink = new ElasticsearchSink(_options)) + { + sink.Emit(ADummyLogEvent()); + sink.Emit(ADummyLogEvent()); + } + + var bulkJsonPieces = this.AssertSeenHttpPosts(_seenHttpPosts, 2, 1); + const string expectedAction = @"{""index"":{""_type"":""_doc"",""_index"":""logs-my-stream"",""pipeline"":""my-pipeline""}}"; + bulkJsonPieces[0].Should().Be(expectedAction); + } + + private static LogEvent ADummyLogEvent() { + return new LogEvent(DateTimeOffset.Now, LogEventLevel.Information, null, + new MessageTemplate("A template", Enumerable.Empty()), + Enumerable.Empty()); + } + } +} \ No newline at end of file diff --git a/test/Serilog.Sinks.Elasticsearch.Tests/ElasticsearchSinkTestsBase.cs b/test/Serilog.Sinks.Elasticsearch.Tests/ElasticsearchSinkTestsBase.cs index 719c7e7c..6fc6c7a3 100644 --- a/test/Serilog.Sinks.Elasticsearch.Tests/ElasticsearchSinkTestsBase.cs +++ b/test/Serilog.Sinks.Elasticsearch.Tests/ElasticsearchSinkTestsBase.cs @@ -102,9 +102,9 @@ protected async Task ThrowAsync() throw new Exception("boom!"); } - protected string[] AssertSeenHttpPosts(List _seenHttpPosts, int lastN) + protected string[] AssertSeenHttpPosts(List _seenHttpPosts, int lastN, int expectedNumberOfRequests = 2) { - _seenHttpPosts.Should().NotBeEmpty().And.HaveCount(2); + _seenHttpPosts.Should().NotBeEmpty().And.HaveCount(expectedNumberOfRequests); var json = string.Join("", _seenHttpPosts); var bulkJsonPieces = json.Split(new[] { '\n' }, StringSplitOptions.RemoveEmptyEntries); From f4011700b6f9df0ad120823cd99f82448e29d08f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96rjan=20Sj=C3=B6holm?= Date: Mon, 14 Sep 2020 13:16:09 +0200 Subject: [PATCH 2/5] Consider op_type when parsing response from bulk action --- .../Sinks/ElasticSearch/ElasticSearchSink.cs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs index 1487cb37..f0460336 100644 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs @@ -204,10 +204,17 @@ private void HandleResponse(IEnumerable events, DynamicResponse result if (result.Success && result.Body?["errors"] == true) { var indexer = 0; + var opType = _state.Options.BatchAction == ElasticOpType.Create + ? "create" + : "index"; var items = result.Body["items"]; foreach (var item in items) { - if (item["index"] != null && HasProperty(item["index"], "error") && item["index"]["error"] != null) + var action = item.ContainsKey(opType) + ? item[opType] + : null; + + if (action != null && action.ContainsKey("error")) { var e = events.ElementAt(indexer); if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.WriteToSelfLog)) @@ -216,8 +223,8 @@ private void HandleResponse(IEnumerable events, DynamicResponse result SelfLog.WriteLine( "Failed to store event with template '{0}' into Elasticsearch. Elasticsearch reports for index {1} the following: {2}", e.MessageTemplate, - item["index"]["_index"], - _state.Serialize(item["index"]["error"])); + action["_index"], + _state.Serialize(action["error"])); } if (_state.Options.EmitEventFailure.HasFlag(EmitEventFailureHandling.WriteToFailureSink) && @@ -251,7 +258,6 @@ private void HandleResponse(IEnumerable events, DynamicResponse result _state.Options.FailureCallback); } } - } indexer++; } From dd671ea586e0e0b1f91f3830646329efa0c62268 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96rjan=20Sj=C3=B6holm?= Date: Mon, 14 Sep 2020 13:24:05 +0200 Subject: [PATCH 3/5] Don't overwrite TypeName if it has been set to null In order to be able to use templates and at the same time remove the doc type we shouldn't override it if it's set to null. --- .../Sinks/ElasticSearch/ElasticsearchSinkState.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs index a605ea0b..a1560c9f 100644 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticsearchSinkState.cs @@ -69,8 +69,8 @@ private ElasticsearchSinkState(ElasticsearchSinkOptions options) if (string.IsNullOrWhiteSpace(options.IndexFormat)) throw new ArgumentException("options.IndexFormat"); if (string.IsNullOrWhiteSpace(options.TemplateName)) throw new ArgumentException("options.TemplateName"); - // Strip type argument if ESv7 since multiple types are not supported anymore - if (options.AutoRegisterTemplateVersion == AutoRegisterTemplateVersion.ESv7) + // Since TypeName is deprecated we shouldn't set it, if has been deliberately set to null. + if (options.TypeName == null && options.AutoRegisterTemplateVersion == AutoRegisterTemplateVersion.ESv7) { options.TypeName = "_doc"; } From eeb0a024661c79ff737ebb367a58a34272d816c2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96rjan=20Sj=C3=B6holm?= Date: Thu, 17 Sep 2020 22:37:10 +0200 Subject: [PATCH 4/5] Support for setting the op_type for durable sink Use the same logic for creating bulk action within the durable sink as in the standard sink. --- .../Elasticsearch/DurableElasticsearchSink.cs | 10 ++-- .../Elasticsearch/ElasticsearchLogClient.cs | 11 ++-- .../ElasticsearchPayloadReader.cs | 26 +++++----- .../Sinks/ElasticSearch/ElasticSearchSink.cs | 51 ++++++++++++------- 4 files changed, 57 insertions(+), 41 deletions(-) diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/Elasticsearch/DurableElasticsearchSink.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/Elasticsearch/DurableElasticsearchSink.cs index 9ac18737..45c1da86 100644 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/Elasticsearch/DurableElasticsearchSink.cs +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/Elasticsearch/DurableElasticsearchSink.cs @@ -15,7 +15,6 @@ using System; using System.Collections.Generic; using System.Text; -using Elasticsearch.Net; using Serilog.Core; using Serilog.Events; @@ -55,15 +54,16 @@ public DurableElasticsearchSink(ElasticsearchSinkOptions options) var elasticSearchLogClient = new ElasticsearchLogClient( - elasticLowLevelClient: _state.Client, - cleanPayload: _state.Options.BufferCleanPayload); + elasticLowLevelClient: _state.Client, + cleanPayload: _state.Options.BufferCleanPayload, + elasticOpType: _state.Options.BatchAction); var payloadReader = new ElasticsearchPayloadReader( pipelineName: _state.Options.PipelineName, typeName:_state.Options.TypeName, serialize:_state.Serialize, - getIndexForEvent: _state.GetBufferedIndexForEvent - ); + getIndexForEvent: _state.GetBufferedIndexForEvent, + elasticOpType: _state.Options.BatchAction); _shipper = new ElasticsearchLogShipper( bufferBaseFilename: _state.Options.BufferBaseFilename, diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/Elasticsearch/ElasticsearchLogClient.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/Elasticsearch/ElasticsearchLogClient.cs index 8c6a74af..b3573e77 100644 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/Elasticsearch/ElasticsearchLogClient.cs +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/Durable/Elasticsearch/ElasticsearchLogClient.cs @@ -1,9 +1,6 @@ using System; using System.Collections.Generic; using System.Linq; -using System.Runtime.ExceptionServices; -using System.Text; -using System.Text.RegularExpressions; using System.Threading.Tasks; using Elasticsearch.Net; using Serilog.Debugging; @@ -17,17 +14,21 @@ public class ElasticsearchLogClient : ILogClient> { private readonly IElasticLowLevelClient _elasticLowLevelClient; private readonly Func _cleanPayload; + private readonly ElasticOpType _elasticOpType; /// /// /// /// /// + /// public ElasticsearchLogClient(IElasticLowLevelClient elasticLowLevelClient, - Func cleanPayload) + Func cleanPayload, + ElasticOpType elasticOpType) { _elasticLowLevelClient = elasticLowLevelClient; _cleanPayload = cleanPayload; + _elasticOpType = elasticOpType; } public async Task SendPayloadAsync(List payload) @@ -85,7 +86,7 @@ private InvalidResult GetInvalidPayloadAsync(DynamicResponse baseResult, List> private readonly string _typeName; private readonly Func _serialize; private readonly Func _getIndexForEvent; + private readonly ElasticOpType _elasticOpType; private List _payload; private int _count; private DateTime _date; @@ -28,12 +28,15 @@ public class ElasticsearchPayloadReader: APayloadReader> /// /// /// - public ElasticsearchPayloadReader(string pipelineName,string typeName, Func serialize,Func getIndexForEvent) + /// + public ElasticsearchPayloadReader(string pipelineName, string typeName, Func serialize, + Func getIndexForEvent, ElasticOpType elasticOpType) { _pipelineName = pipelineName; _typeName = typeName; _serialize = serialize; _getIndexForEvent = getIndexForEvent; + _elasticOpType = elasticOpType; } /// @@ -80,18 +83,13 @@ protected override List FinishPayLoad() protected override void AddToPayLoad(string nextLine) { var indexName = _getIndexForEvent(nextLine, _date); - var action = default(object); + var action = ElasticsearchSink.CreateElasticAction( + opType: _elasticOpType, + indexName: indexName, pipelineName: _pipelineName, + id: _count + "_" + Guid.NewGuid(), + mappingType: _typeName); + var actionJson = LowLevelRequestResponseSerializer.Instance.SerializeToString(action); - if (string.IsNullOrWhiteSpace(_pipelineName)) - { - action = new { index = new { _index = indexName, _type = _typeName, _id = _count + "_" + Guid.NewGuid() } }; - } - else - { - action = new { index = new { _index = indexName, _type = _typeName, _id = _count + "_" + Guid.NewGuid(), pipeline = _pipelineName } }; - } - - var actionJson = _serialize(action); _payload.Add(actionJson); _payload.Add(nextLine); _count++; diff --git a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs index f0460336..d695ed80 100644 --- a/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs +++ b/src/Serilog.Sinks.Elasticsearch/Sinks/ElasticSearch/ElasticSearchSink.cs @@ -82,7 +82,7 @@ protected override async Task EmitBatchAsync(IEnumerable events) if (events == null || !events.Any()) return Task.FromResult(default(T)); - var payload = CreatePlayLoad(events); + var payload = CreatePlayLoad(events); return _state.Client.BulkAsync(PostData.MultiJson(payload)); } @@ -97,7 +97,7 @@ protected override async Task EmitBatchAsync(IEnumerable events) if (events == null || !events.Any()) return null; - var payload = CreatePlayLoad(events); + var payload = CreatePlayLoad(events); return _state.Client.Bulk(PostData.MultiJson(payload)); } @@ -165,8 +165,7 @@ private static bool HasProperty(dynamic settings, string name) return settings.GetType().GetProperty(name) != null; } - private IEnumerable CreatePlayLoad(IEnumerable events) - where T : class, IElasticsearchResponse, new() + private IEnumerable CreatePlayLoad(IEnumerable events) { if (!_state.TemplateRegistrationSuccess && _state.Options.RegisterTemplateFailure == RegisterTemplateRecovery.FailSink) { @@ -179,15 +178,11 @@ private IEnumerable CreatePlayLoad(IEnumerable events) var indexName = _state.GetIndexForEvent(e, e.Timestamp.ToUniversalTime()); var pipelineName = _state.Options.PipelineNameDecider?.Invoke(e) ?? _state.Options.PipelineName; - var actionPayload = new ElasticActionPayload( - indexName, - string.IsNullOrWhiteSpace(pipelineName) ? null : pipelineName, - _state.Options.TypeName - ); - - var action = _state.Options.BatchAction == ElasticOpType.Create - ? (object) new ElasticCreateAction(actionPayload) - : new ElasticIndexAction(actionPayload); + var action = CreateElasticAction( + opType: _state.Options.BatchAction, + indexName: indexName, + pipelineName: pipelineName, + mappingType: _state.Options.TypeName); payload.Add(LowLevelRequestResponseSerializer.Instance.SerializeToString(action)); var sw = new StringWriter(); @@ -204,9 +199,7 @@ private void HandleResponse(IEnumerable events, DynamicResponse result if (result.Success && result.Body?["errors"] == true) { var indexer = 0; - var opType = _state.Options.BatchAction == ElasticOpType.Create - ? "create" - : "index"; + var opType = BulkAction(_state.Options.BatchAction); var items = result.Body["items"]; foreach (var item in items) { @@ -267,6 +260,26 @@ private void HandleResponse(IEnumerable events, DynamicResponse result HandleException(result.OriginalException, events); } } + + internal static string BulkAction(ElasticOpType elasticOpType) => + elasticOpType == ElasticOpType.Create + ? "create" + : "index"; + + internal static object CreateElasticAction(ElasticOpType opType, string indexName, string pipelineName = null, string id = null, string mappingType = null) + { + var actionPayload = new ElasticActionPayload( + indexName: indexName, + pipeline: string.IsNullOrWhiteSpace(pipelineName) ? null : pipelineName, + id: id, + mappingType: mappingType + ); + + var action = opType == ElasticOpType.Create + ? (object) new ElasticCreateAction(actionPayload) + : new ElasticIndexAction(actionPayload); + return action; + } sealed class ElasticCreateAction { @@ -291,10 +304,11 @@ public ElasticIndexAction(ElasticActionPayload payload) } sealed class ElasticActionPayload { - public ElasticActionPayload(string indexName, string pipeline = null, string mappingType = null) + public ElasticActionPayload(string indexName, string pipeline = null, string id = null, string mappingType = null) { IndexName = indexName; Pipeline = pipeline; + Id = id; MappingType = mappingType; } @@ -306,6 +320,9 @@ public ElasticActionPayload(string indexName, string pipeline = null, string map [DataMember(Name = "pipeline")] public string Pipeline { get; } + + [DataMember(Name = "_id")] + public string Id { get; } } } } From 5f361b031f38da4df152432a71078f1163f017de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=96rjan=20Sj=C3=B6holm?= Date: Fri, 18 Sep 2020 00:42:27 +0200 Subject: [PATCH 5/5] Added change log --- CHANGES.md | 17 +++++++++++++++++ README.md | 2 ++ 2 files changed, 19 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 48f92802..7d4a853f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,6 +1,23 @@ ## Changelog * Disable dot-escaping for field names, because ELK already supports dots in field names. +* Support for explicitly setting `Options.TypeName` to `null` this will remove the + deprecated `_type` from the bulk payload being sent to Elastic. Earlier an exception was + thrown if the `Options.TypeName` was `null`. _If you're using `AutoRegisterTemplateVersion.ESv7` + we'll not force the type to `_doc` if it's set to `null`. This is a small step towards support + for writing logs to Elastic v8. #345 +* Support for setting the Elastic `op_type` e.g. `index` or `create` for bulk actions. + This is a requirement for writing to [data streams](https://www.elastic.co/guide/en/elasticsearch/reference/7.9/data-streams.html) + that's only supporting `create`. Data streams is a more slipped stream way to handle rolling + indices, that previous required an ILM, template and a magic write alias. Now it's more integrated + in Elasticsearch and Kibana. If you're running Elastic `7.9` you'll get rolling indices out of the box + with this configuration: + ``` + TypeName = null, + IndexFormat = "logs-my-stream", + BatchAction = ElasticOpType.Create, + ``` + _Note: that current templates doesn't support data streams._ #355 8.2 * Allow the use of templateCustomSettings when reading from settings json (#315) diff --git a/README.md b/README.md index d3a9dd37..8344482a 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,7 @@ This example shows the options that are currently available when using the appSe + @@ -179,6 +180,7 @@ In your `appsettings.json` file, under the `Serilog` node, : "typeName": "myCustomLogEventType", "pipelineName": "myCustomPipelineName", "batchPostingLimit": 50, + "batchAction": "create", "period": 2, "inlineFields": true, "restrictedToMinimumLevel": "Warning",