From f4584419283efb9d48e19ac79e2a045c996c9670 Mon Sep 17 00:00:00 2001 From: Kirill Rakhman Date: Tue, 8 May 2018 13:25:12 +0200 Subject: [PATCH] Add maxBatchSize parameter to IntervalBatcher Fixes #58 --- .../CollectorBatchConfiguration.cs | 6 +- .../PipelinedCollectorBatchConfiguration.cs | 8 ++- .../Pipeline/Batch/IntervalBatcher.cs | 18 +++++- .../Util/EnumerableExtensions.cs | 55 +++++++++++++++++++ 4 files changed, 80 insertions(+), 7 deletions(-) create mode 100644 src/InfluxDB.Collector/Util/EnumerableExtensions.cs diff --git a/src/InfluxDB.Collector/Configuration/CollectorBatchConfiguration.cs b/src/InfluxDB.Collector/Configuration/CollectorBatchConfiguration.cs index b40f113..f662d42 100644 --- a/src/InfluxDB.Collector/Configuration/CollectorBatchConfiguration.cs +++ b/src/InfluxDB.Collector/Configuration/CollectorBatchConfiguration.cs @@ -4,6 +4,8 @@ namespace InfluxDB.Collector.Configuration { public abstract class CollectorBatchConfiguration { - public abstract CollectorConfiguration AtInterval(TimeSpan interval); + public CollectorConfiguration AtInterval(TimeSpan interval) => AtInterval(interval, 5000); + + public abstract CollectorConfiguration AtInterval(TimeSpan interval, int? maxBatchSize); } -} +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Configuration/PipelinedCollectorBatchConfiguration.cs b/src/InfluxDB.Collector/Configuration/PipelinedCollectorBatchConfiguration.cs index ffbd60f..00eca5f 100644 --- a/src/InfluxDB.Collector/Configuration/PipelinedCollectorBatchConfiguration.cs +++ b/src/InfluxDB.Collector/Configuration/PipelinedCollectorBatchConfiguration.cs @@ -8,6 +8,7 @@ class PipelinedCollectorBatchConfiguration : CollectorBatchConfiguration { readonly CollectorConfiguration _configuration; TimeSpan? _interval; + int? _maxBatchSize; public PipelinedCollectorBatchConfiguration(CollectorConfiguration configuration) { @@ -15,9 +16,10 @@ public PipelinedCollectorBatchConfiguration(CollectorConfiguration configuration _configuration = configuration; } - public override CollectorConfiguration AtInterval(TimeSpan interval) + public override CollectorConfiguration AtInterval(TimeSpan interval, int? maxBatchSize) { _interval = interval; + _maxBatchSize = maxBatchSize; return _configuration; } @@ -29,9 +31,9 @@ public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose) return parent; } - var batcher = new IntervalBatcher(_interval.Value, parent); + var batcher = new IntervalBatcher(_interval.Value, _maxBatchSize, parent); dispose = batcher.Dispose; return batcher; } } -} +} \ No newline at end of file diff --git a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs index c7ccfe5..521ca6f 100644 --- a/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs +++ b/src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs @@ -1,8 +1,10 @@ using System; using System.Collections.Generic; +using System.Linq; using System.Threading.Tasks; using InfluxDB.Collector.Diagnostics; using InfluxDB.Collector.Platform; +using InfluxDB.Collector.Util; namespace InfluxDB.Collector.Pipeline.Batch { @@ -12,6 +14,7 @@ class IntervalBatcher : IPointEmitter, IDisposable Queue _queue = new Queue(); readonly TimeSpan _interval; + readonly int? _maxBatchSize; readonly IPointEmitter _parent; readonly object _stateLock = new object(); @@ -19,10 +22,11 @@ class IntervalBatcher : IPointEmitter, IDisposable bool _unloading; bool _started; - public IntervalBatcher(TimeSpan interval, IPointEmitter parent) + public IntervalBatcher(TimeSpan interval, int? maxBatchSize, IPointEmitter parent) { _parent = parent; _interval = interval; + _maxBatchSize = maxBatchSize; _timer = new PortableTimer(cancel => OnTick()); } @@ -60,7 +64,17 @@ Task OnTick() _queue = new Queue(); } - _parent.Emit(batch.ToArray()); + if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value) + { + _parent.Emit(batch.ToArray()); + } + else + { + foreach (var chunk in batch.Batch(_maxBatchSize.Value)) + { + _parent.Emit(chunk.ToArray()); + } + } } catch (Exception ex) { diff --git a/src/InfluxDB.Collector/Util/EnumerableExtensions.cs b/src/InfluxDB.Collector/Util/EnumerableExtensions.cs new file mode 100644 index 0000000..aab0236 --- /dev/null +++ b/src/InfluxDB.Collector/Util/EnumerableExtensions.cs @@ -0,0 +1,55 @@ +using System.Collections.Generic; +using System.Linq; + +namespace InfluxDB.Collector.Util +{ + internal static class EnumerableExtensions + { + // Copied from https://github.com/morelinq/MoreLINQ/blob/master/MoreLinq/Batch.cs + // Original license below + // + // MoreLINQ - Extensions to LINQ to Objects + // Copyright (c) 2009 Atif Aziz. All rights reserved. + // + // Licensed under the Apache License, Version 2.0 (the "License"); + // you may not use this file except in compliance with the License. + // You may obtain a copy of the License at + // + // http://www.apache.org/licenses/LICENSE-2.0 + // + // Unless required by applicable law or agreed to in writing, software + // distributed under the License is distributed on an "AS IS" BASIS, + // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + // See the License for the specific language governing permissions and + // limitations under the License. + public static IEnumerable> Batch(this IEnumerable source, int size) + { + TSource[] bucket = null; + var count = 0; + + foreach (var item in source) + { + if (bucket == null) + { + bucket = new TSource[size]; + } + + bucket[count++] = item; + if (count != size) + { + continue; + } + + yield return bucket; + + bucket = null; + count = 0; + } + + if (bucket != null && count > 0) + { + yield return bucket.Take(count); + } + } + } +} \ No newline at end of file