Skip to content

Commit

Permalink
Add maxBatchSize parameter to IntervalBatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
cypressious committed May 7, 2018
1 parent 2e8eca8 commit 8a62bb9
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@ namespace InfluxDB.Collector.Configuration
{
public abstract class CollectorBatchConfiguration
{
public abstract CollectorConfiguration AtInterval(TimeSpan interval);
public abstract CollectorConfiguration AtInterval(TimeSpan interval, int? maxBatchSize = null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,18 @@ class PipelinedCollectorBatchConfiguration : CollectorBatchConfiguration
{
readonly CollectorConfiguration _configuration;
TimeSpan? _interval;
int? _maxBatchSize;

public PipelinedCollectorBatchConfiguration(CollectorConfiguration configuration)
{
if (configuration == null) throw new ArgumentNullException(nameof(configuration));
_configuration = configuration;
}

public override CollectorConfiguration AtInterval(TimeSpan interval)
public override CollectorConfiguration AtInterval(TimeSpan interval, int? maxBatchSize = null)
{
_interval = interval;
_maxBatchSize = maxBatchSize;
return _configuration;
}

Expand All @@ -29,7 +31,7 @@ public IPointEmitter CreateEmitter(IPointEmitter parent, out Action dispose)
return parent;
}

var batcher = new IntervalBatcher(_interval.Value, parent);
var batcher = new IntervalBatcher(_interval.Value, _maxBatchSize, parent);
dispose = batcher.Dispose;
return batcher;
}
Expand Down
18 changes: 16 additions & 2 deletions src/InfluxDB.Collector/Pipeline/Batch/IntervalBatcher.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using InfluxDB.Collector.Diagnostics;
using InfluxDB.Collector.Platform;
using InfluxDB.Collector.Util;

namespace InfluxDB.Collector.Pipeline.Batch
{
Expand All @@ -12,17 +14,19 @@ class IntervalBatcher : IPointEmitter, IDisposable
Queue<PointData> _queue = new Queue<PointData>();

readonly TimeSpan _interval;
readonly int? _maxBatchSize;
readonly IPointEmitter _parent;

readonly object _stateLock = new object();
readonly PortableTimer _timer;
bool _unloading;
bool _started;

public IntervalBatcher(TimeSpan interval, IPointEmitter parent)
public IntervalBatcher(TimeSpan interval, int? maxBatchSize, IPointEmitter parent)
{
_parent = parent;
_interval = interval;
_maxBatchSize = maxBatchSize;
_timer = new PortableTimer(cancel => OnTick());
}

Expand Down Expand Up @@ -60,7 +64,17 @@ Task OnTick()
_queue = new Queue<PointData>();
}

_parent.Emit(batch.ToArray());
if (_maxBatchSize == null || batch.Count <= _maxBatchSize.Value)
{
_parent.Emit(batch.ToArray());
}
else
{
foreach (var chunk in batch.Batch(_maxBatchSize.Value))
{
_parent.Emit(chunk.ToArray());
}
}
}
catch (Exception ex)
{
Expand Down
39 changes: 39 additions & 0 deletions src/InfluxDB.Collector/Util/EnumerableExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using System.Collections.Generic;
using System.Linq;

namespace InfluxDB.Collector.Util
{
internal static class EnumerableExtensions
{
// from https://github.com/morelinq/MoreLINQ/blob/master/MoreLinq/Batch.cs
public static IEnumerable<IEnumerable<TSource>> Batch<TSource>(this IEnumerable<TSource> source, int size)
{
TSource[] bucket = null;
var count = 0;

foreach (var item in source)
{
if (bucket == null)
{
bucket = new TSource[size];
}

bucket[count++] = item;
if (count != size)
{
continue;
}

yield return bucket;

bucket = null;
count = 0;
}

if (bucket != null && count > 0)
{
yield return bucket.Take(count);
}
}
}
}

0 comments on commit 8a62bb9

Please sign in to comment.