From be24c657306361115d5d47573a862d58df86b272 Mon Sep 17 00:00:00 2001 From: Martijn Laarman Date: Tue, 8 Oct 2024 14:16:23 +0200 Subject: [PATCH] Hacking new serialization with Steve at techorama --- ...kRequestCreationForDataStreamBenchmarks.cs | 22 ++--- ...estCreationWithFixedIndexNameBenchmarks.cs | 22 ++--- ...reationWithTemplatedIndexNameBenchmarks.cs | 22 ++--- .../Program.cs | 2 +- build/scripts/CommandLine.fs | 10 +- global.json | 2 +- .../DataStreams/DataStreamChannel.cs | 7 +- ...y.cs => ElasticsearchChannelBase.Bytes.cs} | 94 ++++++++++++++----- .../ElasticsearchChannelBase.Serialization.cs | 38 ++++++++ .../ElasticsearchChannelBase.cs | 21 ++--- .../Indices/IndexChannel.cs | 13 ++- .../Serialization/BulkOperationHeader.cs | 7 ++ 12 files changed, 180 insertions(+), 80 deletions(-) rename src/Elastic.Ingest.Elasticsearch/{Serialization/BulkRequestDataFactory.cs => ElasticsearchChannelBase.Bytes.cs} (74%) create mode 100644 src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs index c44a74a..27f4adc 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationForDataStreamBenchmarks.cs @@ -42,15 +42,15 @@ public void Setup() _data = StockData.CreateSampleData(DocumentsToIndex); } - [Benchmark(Baseline = true)] - public async Task WriteToStreamAsync() - { - MemoryStream.Position = 0; - var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, _ => _bulkOperationHeader); - var requestData = new RequestData( - POST, "/_bulk", PostData.ReadOnlyMemory(bytes), - _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() - ); - await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); - } + // [Benchmark(Baseline = true)] + // public async Task WriteToStreamAsync() + // { + // MemoryStream.Position = 0; + // var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, _ => _bulkOperationHeader); + // var requestData = new RequestData( + // POST, "/_bulk", PostData.ReadOnlyMemory(bytes), + // _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() + // ); + // await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); + // } } diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs index 0d149d4..c3b1049 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithFixedIndexNameBenchmarks.cs @@ -41,15 +41,15 @@ public void Setup() _data = StockData.CreateSampleData(DocumentsToIndex); } - [Benchmark(Baseline = true)] - public async Task WriteToStreamAsync() - { - MemoryStream.Position = 0; - var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, true)); - var requestData = new RequestData( - POST, "/_bulk", PostData.ReadOnlyMemory(bytes), - _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() - ); - await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); - } + // [Benchmark(Baseline = true)] + // public async Task WriteToStreamAsync() + // { + // MemoryStream.Position = 0; + // var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, true)); + // var requestData = new RequestData( + // POST, "/_bulk", PostData.ReadOnlyMemory(bytes), + // _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() + // ); + // await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); + // } } diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs index cb9b80f..cb778b2 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Benchmarks/BulkRequestCreationWithTemplatedIndexNameBenchmarks.cs @@ -40,15 +40,15 @@ public void Setup() _data = StockData.CreateSampleData(DocumentsToIndex); } - [Benchmark(Baseline = true)] - public async Task DynamicIndexName_WriteToStreamAsync() - { - MemoryStream.Position = 0; - var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, false)); - var requestData = new RequestData( - POST, "/_bulk", PostData.ReadOnlyMemory(bytes), - _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() - ); - await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); - } + // [Benchmark(Baseline = true)] + // public async Task DynamicIndexName_WriteToStreamAsync() + // { + // MemoryStream.Position = 0; + // var bytes = BulkRequestDataFactory.GetBytes(_data, _options!, e => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(e, _options!, false)); + // var requestData = new RequestData( + // POST, "/_bulk", PostData.ReadOnlyMemory(bytes), + // _transportConfiguration!, null!, ((ITransportConfiguration)_transportConfiguration!).MemoryStreamFactory, new OpenTelemetryData() + // ); + // await requestData.PostData.WriteAsync(MemoryStream, _transportConfiguration!, CancellationToken.None); + // } } diff --git a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs index 2564764..94f10b7 100644 --- a/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs +++ b/benchmarks/Elastic.Ingest.Elasticsearch.Benchmarks/Program.cs @@ -24,7 +24,7 @@ var bm = new BulkRequestCreationWithFixedIndexNameBenchmarks(); bm.Setup(); -await bm.WriteToStreamAsync(); +//await bm.WriteToStreamAsync(); var length = bm.MemoryStream.Length; diff --git a/build/scripts/CommandLine.fs b/build/scripts/CommandLine.fs index 12a4cc5..545f2b0 100644 --- a/build/scripts/CommandLine.fs +++ b/build/scripts/CommandLine.fs @@ -29,11 +29,11 @@ with interface IArgParserTemplate with member this.Usage = match this with - | Clean _ -> "clean known output locations" - | Build _ -> "Run build" - | Test _ -> "Runs build then tests" - | Release _ -> "runs build, tests, and create and validates the packages shy of publishing them" - | Publish _ -> "Runs the full release" + | Clean -> "clean known output locations" + | Build -> "Run build" + | Test -> "Runs build then tests" + | Release -> "runs build, tests, and create and validates the packages shy of publishing them" + | Publish -> "Runs the full release" | SingleTarget _ -> "Runs the provided sub command without running their dependencies" | Token _ -> "Token to be used to authenticate with github" diff --git a/global.json b/global.json index c317b00..789bff3 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "6.0.302", + "version": "8.0.100", "rollForward": "latestFeature", "allowPrerelease": false } diff --git a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs index 44cd75b..d04d59f 100644 --- a/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/DataStreams/DataStreamChannel.cs @@ -29,8 +29,11 @@ public DataStreamChannel(DataStreamChannelOptions options, ICollection - protected override BulkOperationHeader CreateBulkOperationHeader(TEvent @event) => _fixedHeader; + /// + protected override IndexOp GetIndexOp(TEvent @event) => IndexOp.CreateNoParams; + + /// + protected override void MutateHeader(ref readonly BulkHeader header) { } /// protected override string TemplateName => Options.DataStream.GetTemplateName(); diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs similarity index 74% rename from src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs rename to src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs index 8f00950..4db13a1 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkRequestDataFactory.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Bytes.cs @@ -2,36 +2,67 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information -#if NETSTANDARD2_1_OR_GREATER +using System; using System.Buffers; -#else using System.Collections.Generic; -#endif -using System; using System.IO; +using System.Linq; using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Elastic.Channels; +using Elastic.Channels.Diagnostics; +using Elastic.Ingest.Elasticsearch.DataStreams; using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Serialization; +using Elastic.Ingest.Transport; +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics; -namespace Elastic.Ingest.Elasticsearch.Serialization; +namespace Elastic.Ingest.Elasticsearch; + +/// TODO +public enum IndexOp +{ + /// + Index, + /// + IndexNoParams, + /// + Create, + /// + CreateNoParams, + /// + Delete, + /// + Update, +} + +/// TODO +public readonly struct BulkHeader +{ + +} /// -/// Provides static factory methods from producing request data for bulk requests. +/// An abstract base class for both and +/// Coordinates most of the sending to- and bootstrapping of Elasticsearch /// -public static class BulkRequestDataFactory +public abstract partial class ElasticsearchChannelBase + : TransportChannelBase + where TChannelOptions : ElasticsearchChannelOptionsBase { + #if NETSTANDARD2_1_OR_GREATER /// /// Get the NDJSON request body bytes for a page of events. /// - /// The type for the event being ingested. /// A page of events. /// The for the channel where the request will be written. /// A function which takes an instance of and produces the operation header containing the action and optional meta data. /// A of representing the entire request body in NDJSON format. - public static ReadOnlyMemory GetBytes(ArraySegment page, + public ReadOnlyMemory GetBytes(ArraySegment page, ElasticsearchChannelOptionsBase options, Func createHeaderFactory) { // ArrayBufferWriter inserts comma's when serializing multiple times @@ -71,19 +102,25 @@ public static ReadOnlyMemory GetBytes(ArraySegment page, } #endif + /// TODO + protected abstract IndexOp GetIndexOp(TEvent @event); + + /// + /// + /// + /// + protected abstract void MutateHeader(ref readonly BulkHeader header); + /// /// Asynchronously write the NDJSON request body for a page of events to . /// - /// The type for the event being ingested. /// A page of events. /// The target for the request. /// The for the channel where the request will be written. - /// A function which takes an instance of and produces the operation header containing the action and optional meta data. /// The cancellation token to cancel operation. /// - public static async Task WriteBufferToStreamAsync(ArraySegment page, Stream stream, - ElasticsearchChannelOptionsBase options, Func createHeaderFactory, - CancellationToken ctx = default) + public async Task WriteBufferToStreamAsync( + ArraySegment page, Stream stream, ElasticsearchChannelOptionsBase options, CancellationToken ctx = default) { #if NETSTANDARD2_1_OR_GREATER var items = page; @@ -99,12 +136,27 @@ public static async Task WriteBufferToStreamAsync(ArraySegment p var @event = items[i]; if (@event == null) continue; - var indexHeader = createHeaderFactory(@event); - await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), SerializerOptions, ctx) - .ConfigureAwait(false); + var op = GetIndexOp(@event); + switch (op) + { + case IndexOp.IndexNoParams: + await SerializePlainIndexHeaderAsync(stream, ctx).ConfigureAwait(false); + break; + case IndexOp.Index: + case IndexOp.Create: + case IndexOp.Delete: + case IndexOp.Update: + var header = new BulkHeader(); + MutateHeader(ref header); + await SerializeHeaderAsync(stream, ref header, SerializerOptions, ctx).ConfigureAwait(false); + break; + + + } + await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); - if (indexHeader is UpdateOperation) + if (op == IndexOp.Update) await stream.WriteAsync(DocUpdateHeaderStart, 0, DocUpdateHeaderStart.Length, ctx).ConfigureAwait(false); if (options.EventWriter?.WriteToStreamAsync != null) @@ -113,7 +165,7 @@ await JsonSerializer.SerializeAsync(stream, indexHeader, indexHeader.GetType(), await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx) .ConfigureAwait(false); - if (indexHeader is UpdateOperation) + if (op == IndexOp.Update) await stream.WriteAsync(DocUpdateHeaderEnd, 0, DocUpdateHeaderEnd.Length, ctx).ConfigureAwait(false); await stream.WriteAsync(LineFeed, 0, 1, ctx).ConfigureAwait(false); @@ -123,12 +175,11 @@ await JsonSerializer.SerializeAsync(stream, @event, SerializerOptions, ctx) /// /// Create the bulk operation header with the appropriate action and meta data for a bulk request targeting an index. /// - /// The type for the event being ingested. /// The for which the header will be produced. /// The for the channel. /// Control whether the index name is included in the meta data for the operation. /// A instance. - public static BulkOperationHeader CreateBulkOperationHeaderForIndex(TEvent @event, IndexChannelOptions options, + public static BulkOperationHeader CreateBulkOperationHeaderForIndex(TEvent @event, IndexChannelOptions options, bool skipIndexName = false) { var indexTime = options.TimestampLookup?.Invoke(@event) ?? DateTimeOffset.Now; @@ -161,4 +212,3 @@ public static BulkOperationHeader CreateBulkOperationHeaderForIndex(TEve : skipIndexName ? new CreateOperation() : new CreateOperation { Index = index }; } } - diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs new file mode 100644 index 0000000..6358864 --- /dev/null +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.Serialization.cs @@ -0,0 +1,38 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using Elastic.Channels; +using Elastic.Channels.Diagnostics; +using Elastic.Ingest.Elasticsearch.DataStreams; +using Elastic.Ingest.Elasticsearch.Indices; +using Elastic.Ingest.Elasticsearch.Serialization; +using Elastic.Ingest.Transport; +using Elastic.Transport; +using Elastic.Transport.Products.Elasticsearch; +using static Elastic.Ingest.Elasticsearch.ElasticsearchChannelStatics; + +namespace Elastic.Ingest.Elasticsearch; + +/// +/// An abstract base class for both and +/// Coordinates most of the sending to- and bootstrapping of Elasticsearch +/// +public abstract partial class ElasticsearchChannelBase + : TransportChannelBase + where TChannelOptions : ElasticsearchChannelOptionsBase +{ + private Task SerializeHeaderAsync(Stream stream, ref readonly BulkHeader header, JsonSerializerOptions serializerOptions, CancellationToken ctx) => + throw new NotImplementedException(); + + private Task SerializePlainIndexHeaderAsync(Stream stream, CancellationToken ctx) => + throw new NotImplementedException(); +} diff --git a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs index d391a0f..8c6d39e 100644 --- a/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs +++ b/src/Elastic.Ingest.Elasticsearch/ElasticsearchChannelBase.cs @@ -19,6 +19,8 @@ namespace Elastic.Ingest.Elasticsearch; + + /// /// An abstract base class for both and /// Coordinates most of the sending to- and bootstrapping of Elasticsearch @@ -71,12 +73,12 @@ protected override Task ExportAsync(ITransport transport, ArraySeg #if NETSTANDARD2_1 // Option is obsolete to prevent external users to set it. #pragma warning disable CS0618 - if (Options.UseReadOnlyMemory) -#pragma warning restore CS0618 - { - var bytes = BulkRequestDataFactory.GetBytes(page, Options, CreateBulkOperationHeader); - return transport.RequestAsync(HttpMethod.POST, BulkUrl, PostData.ReadOnlyMemory(bytes), RequestParams, ctx); - } +// if (Options.UseReadOnlyMemory) +// #pragma warning restore CS0618 +// { +// var bytes = BulkRequestDataFactory.GetBytes(page, Options, CreateBulkOperationHeader); +// return transport.RequestAsync(HttpMethod.POST, BulkUrl, PostData.ReadOnlyMemory(bytes), RequestParams, ctx); +// } #endif #pragma warning disable IDE0022 // Use expression body for method return transport.RequestAsync(HttpMethod.POST, BulkUrl, @@ -85,16 +87,11 @@ protected override Task ExportAsync(ITransport transport, ArraySeg { /* NOT USED */ }, - async (b, stream, ctx) => { await BulkRequestDataFactory.WriteBufferToStreamAsync(b, stream, Options, CreateBulkOperationHeader, ctx).ConfigureAwait(false); }) + async (b, stream, ctx) => { await WriteBufferToStreamAsync(b, stream, Options, ctx).ConfigureAwait(false); }) , RequestParams, ctx); #pragma warning restore IDE0022 // Use expression body for method } - /// - /// Asks implementations to create a based on the being exported. - /// - protected abstract BulkOperationHeader CreateBulkOperationHeader(TEvent @event); - /// protected class HeadIndexTemplateResponse : ElasticsearchResponse { } diff --git a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs index 0e471ed..a8d01ea 100644 --- a/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs +++ b/src/Elastic.Ingest.Elasticsearch/Indices/IndexChannel.cs @@ -15,7 +15,7 @@ namespace Elastic.Ingest.Elasticsearch.Indices; /// public class IndexChannel : ElasticsearchChannelBase> { - private readonly bool _skipIndexNameOnOperations = false; + //private readonly bool _skipIndexNameOnOperations = false; private readonly string _url; /// @@ -31,7 +31,7 @@ public IndexChannel(IndexChannelOptions options, ICollection options, ICollection protected override string BulkUrl => _url; - /// - protected override BulkOperationHeader CreateBulkOperationHeader(TEvent @event) => BulkRequestDataFactory.CreateBulkOperationHeaderForIndex(@event, Options, _skipIndexNameOnOperations); + // TODO implement + + /// + protected override IndexOp GetIndexOp(TEvent @event) => IndexOp.IndexNoParams; + + /// + protected override void MutateHeader(ref readonly BulkHeader header) => throw new NotImplementedException(); /// protected override string TemplateName { get; } diff --git a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs index b5dd325..b66ed31 100644 --- a/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs +++ b/src/Elastic.Ingest.Elasticsearch/Serialization/BulkOperationHeader.cs @@ -8,6 +8,13 @@ namespace Elastic.Ingest.Elasticsearch.Serialization; +/// +/// +/// +public struct BulkOperationHeader2 +{ +} + /// Represents the _bulk operation meta header public abstract class BulkOperationHeader {