Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix serialization verification problem with Akka.IO messages #4974

4 changes: 3 additions & 1 deletion src/core/Akka.Tests/IO/TcpIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class AckWithValue : Tcp.Event

public TcpIntegrationSpec(ITestOutputHelper output)
: base($@"akka.loglevel = DEBUG
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here so we can test similar problems in the future

akka.io.tcp.trace-logging = true
akka.io.tcp.write-commands-queue-max-size = {InternalConnectionActorMaxQueueSize}", output: output)
{ }
Expand Down Expand Up @@ -190,7 +192,7 @@ public void The_TCP_transport_implementation_should_properly_support_connecting_
var targetAddress = new DnsEndPoint("localhost", boundMsg.LocalAddress.AsInstanceOf<IPEndPoint>().Port);
var clientHandler = CreateTestProbe();
Sys.Tcp().Tell(new Tcp.Connect(targetAddress), clientHandler);
clientHandler.ExpectMsg<Tcp.Connected>(TimeSpan.FromMinutes(10));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A socket shouldn't take 10 minutes to connect

clientHandler.ExpectMsg<Tcp.Connected>(TimeSpan.FromSeconds(3));
var clientEp = clientHandler.Sender;
clientEp.Tell(new Tcp.Register(clientHandler));
serverHandler.ExpectMsg<Tcp.Connected>();
Expand Down
8 changes: 5 additions & 3 deletions src/core/Akka.Tests/IO/TcpListenerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ namespace Akka.Tests.IO
public class TcpListenerSpec : AkkaSpec
{
public TcpListenerSpec()
: base(@"akka.io.tcp.register-timeout = 500ms
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here so we can test similar problems in the future

akka.io.tcp.register-timeout = 500ms
akka.io.tcp.max-received-message-size = 1024
akka.io.tcp.direct-buffer-size = 512
akka.actor.serialize-creators = on
akka.io.tcp.batch-accept-limit = 2
")
akka.io.tcp.batch-accept-limit = 2")
{ }

[Fact]
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class UdpConnectedIntegrationSpec : AkkaSpec

public UdpConnectedIntegrationSpec(ITestOutputHelper output)
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here so we can test similar problems in the future

akka.io.udp-connected.nr-of-selectors = 1
akka.io.udp-connected.direct-buffer-pool-limit = 100
akka.io.udp-connected.direct-buffer-size = 1024
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Tests/IO/UdpIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class UdpIntegrationSpec : AkkaSpec

public UdpIntegrationSpec(ITestOutputHelper output)
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here so we can test similar problems in the future

akka.io.udp.max-channels = unlimited
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
Expand Down
10 changes: 7 additions & 3 deletions src/core/Akka.Tests/IO/UdpListenerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@
using Akka.IO;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using UdpListener = Akka.IO.UdpListener;

namespace Akka.Tests.IO
{
public class UdpListenerSpec : AkkaSpec
{
public UdpListenerSpec()
: base(@"akka.io.udp.max-channels = unlimited
public UdpListenerSpec(ITestOutputHelper output)
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leaving this here so we can test similar problems in the future

akka.io.udp.max-channels = unlimited
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
akka.io.udp.direct-buffer-size = 1024")
akka.io.udp.direct-buffer-size = 1024", output)
{ }

[Fact]
Expand Down
16 changes: 13 additions & 3 deletions src/core/Akka/Actor/ActorCell.Children.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
using System.Collections.Immutable;
using System.Text;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using System.Threading;
using Akka.Actor.Internal;
using Akka.Dispatch.SysMsg;
using Akka.Serialization;
using Akka.Util;
using Akka.Util.Internal;
Expand Down Expand Up @@ -452,6 +454,7 @@ private IInternalActorRef MakeChild(Props props, string name, bool async, bool s
if (_systemImpl.Settings.SerializeAllCreators && !systemService && !(props.Deploy.Scope is LocalScope))
{
var oldInfo = Serialization.Serialization.CurrentTransportInformation;
object propArgument = null;
try
{
if (oldInfo == null)
Expand All @@ -465,17 +468,24 @@ private IInternalActorRef MakeChild(Props props, string name, bool async, bool s
{
if (argument != null && !(argument is INoSerializationVerificationNeeded))
{
propArgument = argument;
var serializer = ser.FindSerializerFor(argument);
var bytes = serializer.ToBinary(argument);
var ms = Serialization.Serialization.ManifestFor(serializer, argument);
if(ser.Deserialize(bytes, serializer.Identifier, ms) == null)
if (ser.Deserialize(bytes, serializer.Identifier, ms) == null)
throw new ArgumentException(
$"Pre-creation serialization check failed at [${_self.Path}/{name}]",
nameof(name));
$"Pre-creation serialization check failed at [${_self.Path}/{name}]",
nameof(name));
}
}
}
}
catch (Exception e)
{
throw new SerializationException(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catch and re-throw to add a more meaningful exception message to the error

$"Failed to serialize and deserialize actor props argument of type {propArgument?.GetType()}",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's be a good idea to capture the actor type here too

e);
}
finally
{
Serialization.Serialization.CurrentTransportInformation = oldInfo;
Expand Down
8 changes: 8 additions & 0 deletions src/core/Akka/Actor/ActorCell.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Akka.Dispatch.SysMsg;
using Akka.Event;
using System.Reflection;
using System.Runtime.Serialization;
using Akka.Serialization;
using Akka.Util;
using Assert = System.Diagnostics.Debug;
Expand Down Expand Up @@ -518,6 +519,9 @@ private Envelope SerializeAndDeserialize(Envelope envelope)
if (unwrapped is INoSerializationVerificationNeeded)
return envelope;

if(unwrapped.GetType().Namespace?.StartsWith("System.Net.Sockets") ?? false)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needed to add this to trap System.Net.Sockets.SocketAsyncEventArgs which also get sent as payloads

Copy link
Member

@Aaronontheweb Aaronontheweb Apr 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer it if we just wrapped the SocketAsyncEventArgs inside a struct of some kind that implements INoSerializationVerificationNeeded

return envelope;

var deserializedMsg = SerializeAndDeserializePayload(unwrapped);
if (deadLetter != null)
return new Envelope(new DeadLetter(deserializedMsg, deadLetter.Sender, deadLetter.Recipient), envelope.Sender);
Expand All @@ -540,6 +544,10 @@ private object SerializeAndDeserializePayload(object obj)
var manifest = Serialization.Serialization.ManifestFor(serializer, obj);
return _systemImpl.Serialization.Deserialize(bytes, serializer.Identifier, manifest);
}
catch (Exception e)
{
throw new SerializationException($"Failed to serialize and deserialize payload object {obj.GetType()}", e);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Catch and re-throw to add a more meaningful exception message to the error

}
finally
{
Serialization.Serialization.CurrentTransportInformation = oldInfo;
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/IO/Dns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class Dns : ExtensionIdProvider<DnsExt>
/// <summary>
/// TBD
/// </summary>
public abstract class Command
public abstract class Command : INoSerializationVerificationNeeded
{ }

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/IO/Tcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public override TcpExt CreateExtension(ExtendedActorSystem system)

#region internal connection messages

internal abstract class SocketCompleted { }
internal abstract class SocketCompleted : INoSerializationVerificationNeeded { }

internal sealed class SocketSent : SocketCompleted
{
Expand Down
4 changes: 2 additions & 2 deletions src/core/Akka/IO/Udp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class Udp : ExtensionIdProvider<UdpExt>
{
#region internal connection messages

internal abstract class SocketCompleted { }
internal abstract class SocketCompleted : INoSerializationVerificationNeeded { }

internal sealed class SocketSent : SocketCompleted
{
Expand Down Expand Up @@ -104,7 +104,7 @@ public override UdpExt CreateExtension(ExtendedActorSystem system)
}

/// <summary>The common interface for <see cref="Command"/> and <see cref="Event"/>.</summary>
public abstract class Message { }
public abstract class Message : INoSerializationVerificationNeeded { }

/// <summary>The common type of all commands supported by the UDP implementation.</summary>
public abstract class Command : Message
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka/IO/UdpConnected.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class UdpConnected : ExtensionIdProvider<UdpConnectedExt>
{
#region internal connection messages

internal abstract class SocketCompleted
internal abstract class SocketCompleted : INoSerializationVerificationNeeded
{
public readonly SocketAsyncEventArgs EventArgs;

Expand Down Expand Up @@ -92,7 +92,7 @@ public override UdpConnectedExt CreateExtension(ExtendedActorSystem system)
/// <summary>
/// The common interface for <see cref="Command"/> and <see cref="Event"/>.
/// </summary>
public abstract class Message { }
public abstract class Message : INoSerializationVerificationNeeded { }

/// <summary>
/// The common type of all commands supported by the UDP implementation.
Expand Down Expand Up @@ -372,7 +372,7 @@ private Disconnected()
/// <summary>
/// TBD
/// </summary>
public class UdpConnectedExt : IOExtension
public class UdpConnectedExt : IOExtension, INoSerializationVerificationNeeded
{
public UdpConnectedExt(ExtendedActorSystem system)
: this(system, UdpSettings.Create(system.Settings.Config.GetConfig("akka.io.udp-connected")))
Expand Down