Skip to content

Commit

Permalink
Idle CPU fixes (#4476)
Browse files Browse the repository at this point in the history
* added performance counter support to scheduler

* Revert "added performance counter support to scheduler"

This reverts commit 772387e.

* disable dot-netty batching via hard-coding

* added spec to validate overriding default settings

* Revert "disable dot-netty batching via hard-coding"

This reverts commit cfd1f26.

* remove batching stage in its entirety if batching is disabled
  • Loading branch information
Aaronontheweb authored Jun 17, 2020
1 parent 7592725 commit 21736fd
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 5 deletions.
20 changes: 20 additions & 0 deletions src/core/Akka.Remote.Tests/Transport/DotNettyBatchWriterSpecs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Akka.Configuration;
using Akka.Remote.Transport.DotNetty;
using Akka.TestKit;
using DotNetty.Buffers;
Expand Down Expand Up @@ -57,7 +58,26 @@ public DotNettyBatchWriterSpecs(ITestOutputHelper helper) : base(helper)
Flush = new FlushLogger(helper);
}

[Fact]
public void Bugfix4434_should_overwrite_default_BatchWriterSettings()
{
Config c = @"
akka.remote.dot-netty.tcp{
batching{
enabled = false
max-pending-writes = 50
max-pending-bytes = 32k
flush-interval = 10ms
}
}
";
var s = DotNettyTransportSettings.Create(c.GetConfig("akka.remote.dot-netty.tcp"));

s.BatchWriterSettings.EnableBatching.Should().BeFalse();
s.BatchWriterSettings.FlushInterval.Should().NotBe(BatchWriterSettings.DefaultFlushInterval);
s.BatchWriterSettings.MaxPendingBytes.Should().NotBe(BatchWriterSettings.DefaultMaxPendingBytes);
s.BatchWriterSettings.MaxPendingWrites.Should().NotBe(BatchWriterSettings.DefaultMaxPendingWrites);
}

/// <summary>
/// Stay below the write / count and write / byte threshold. Rely on the timer.
Expand Down
3 changes: 2 additions & 1 deletion src/core/Akka.Remote/Transport/DotNetty/DotNettyTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,8 @@ private void SetInitialChannelPipeline(IChannel channel)
}
}

pipeline.AddLast("BatchWriter", new BatchWriter(Settings.BatchWriterSettings));
if(Settings.BatchWriterSettings.EnableBatching)
pipeline.AddLast("BatchWriter", new BatchWriter(Settings.BatchWriterSettings));
}

private void SetClientPipeline(IChannel channel, Address remoteAddress)
Expand Down
44 changes: 40 additions & 4 deletions src/core/Akka.Remote/Transport/DotNetty/TcpTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,9 @@ protected override void RegisterListener(IChannel channel, IHandleEventListener

protected override AssociationHandle CreateHandle(IChannel channel, Address localAddress, Address remoteAddress)
{
return new TcpAssociationHandle(localAddress, remoteAddress, Transport, channel);
if(Transport.Settings.BatchWriterSettings.EnableBatching)
return new BatchingTcpAssociationHandle(localAddress, remoteAddress, Transport, channel);
return new NonBatchingTcpAssociationHandle(localAddress, remoteAddress, Transport, channel);
}

public override void ChannelInactive(IChannelHandlerContext context)
Expand Down Expand Up @@ -159,11 +161,11 @@ private void InitOutbound(IChannel channel, IPEndPoint socketAddress, object msg
}
}

internal sealed class TcpAssociationHandle : AssociationHandle
internal sealed class BatchingTcpAssociationHandle : AssociationHandle
{
private readonly IChannel _channel;

public TcpAssociationHandle(Address localAddress, Address remoteAddress, DotNettyTransport transport, IChannel channel)
public BatchingTcpAssociationHandle(Address localAddress, Address remoteAddress, DotNettyTransport transport, IChannel channel)
: base(localAddress, remoteAddress)
{
_channel = channel;
Expand Down Expand Up @@ -193,7 +195,41 @@ public override void Disassociate()
_channel.CloseAsync();
}
}


internal sealed class NonBatchingTcpAssociationHandle : AssociationHandle
{
private readonly IChannel _channel;

public NonBatchingTcpAssociationHandle(Address localAddress, Address remoteAddress, DotNettyTransport transport, IChannel channel)
: base(localAddress, remoteAddress)
{
_channel = channel;
}

public override bool Write(ByteString payload)
{
if (_channel.Open)
{
var data = ToByteBuffer(_channel, payload);
_channel.WriteAndFlushAsync(data);
return true;
}
return false;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static IByteBuffer ToByteBuffer(IChannel channel, ByteString payload)
{
var buffer = Unpooled.WrappedBuffer(payload.ToByteArray());
return buffer;
}

public override void Disassociate()
{
_channel.CloseAsync();
}
}

internal sealed class TcpTransport : DotNettyTransport
{
public TcpTransport(ActorSystem system, Config config) : base(system, config)
Expand Down

0 comments on commit 21736fd

Please sign in to comment.