diff --git a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt index 1a310d07e9c..2def445471a 100644 --- a/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt +++ b/src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt @@ -3256,7 +3256,7 @@ namespace Akka.IO public static Akka.IO.Dns.Resolved Cached(string name, Akka.Actor.ActorSystem system) { } public override Akka.IO.DnsExt CreateExtension(Akka.Actor.ExtendedActorSystem system) { } public static Akka.IO.Dns.Resolved ResolveName(string name, Akka.Actor.ActorSystem system, Akka.Actor.IActorRef sender) { } - public abstract class Command + public abstract class Command : Akka.Actor.INoSerializationVerificationNeeded { protected Command() { } } @@ -3685,7 +3685,7 @@ namespace Akka.IO { protected Event() { } } - public abstract class Message + public abstract class Message : Akka.Actor.INoSerializationVerificationNeeded { protected Message() { } } @@ -3792,7 +3792,7 @@ namespace Akka.IO { protected Event() { } } - public abstract class Message + public abstract class Message : Akka.Actor.INoSerializationVerificationNeeded { protected Message() { } } @@ -3827,7 +3827,7 @@ namespace Akka.IO public static readonly Akka.IO.UdpConnected.SuspendReading Instance; } } - public class UdpConnectedExt : Akka.IO.IOExtension + public class UdpConnectedExt : Akka.IO.IOExtension, Akka.Actor.INoSerializationVerificationNeeded { public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system) { } public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system, Akka.IO.UdpSettings settings) { } diff --git a/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs b/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs index 67937d68e57..cb8abc1c4d3 100644 --- a/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/TcpIntegrationSpec.cs @@ -45,6 +45,8 @@ class AckWithValue : Tcp.Event public TcpIntegrationSpec(ITestOutputHelper output) : base($@"akka.loglevel = DEBUG + akka.actor.serialize-creators = on + akka.actor.serialize-messages = on akka.io.tcp.trace-logging = true akka.io.tcp.write-commands-queue-max-size = {InternalConnectionActorMaxQueueSize}", output: output) { } @@ -190,7 +192,7 @@ public void The_TCP_transport_implementation_should_properly_support_connecting_ var targetAddress = new DnsEndPoint("localhost", boundMsg.LocalAddress.AsInstanceOf().Port); var clientHandler = CreateTestProbe(); Sys.Tcp().Tell(new Tcp.Connect(targetAddress), clientHandler); - clientHandler.ExpectMsg(TimeSpan.FromMinutes(10)); + clientHandler.ExpectMsg(TimeSpan.FromSeconds(3)); var clientEp = clientHandler.Sender; clientEp.Tell(new Tcp.Register(clientHandler)); serverHandler.ExpectMsg(); diff --git a/src/core/Akka.Tests/IO/TcpListenerSpec.cs b/src/core/Akka.Tests/IO/TcpListenerSpec.cs index e5db0fe2674..02c40edbedb 100644 --- a/src/core/Akka.Tests/IO/TcpListenerSpec.cs +++ b/src/core/Akka.Tests/IO/TcpListenerSpec.cs @@ -19,12 +19,14 @@ namespace Akka.Tests.IO public class TcpListenerSpec : AkkaSpec { public TcpListenerSpec() - : base(@"akka.io.tcp.register-timeout = 500ms + : base(@" + akka.actor.serialize-creators = on + akka.actor.serialize-messages = on + akka.io.tcp.register-timeout = 500ms akka.io.tcp.max-received-message-size = 1024 akka.io.tcp.direct-buffer-size = 512 akka.actor.serialize-creators = on - akka.io.tcp.batch-accept-limit = 2 - ") + akka.io.tcp.batch-accept-limit = 2") { } [Fact] diff --git a/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs b/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs index e948193f219..3f79b8a8d9c 100644 --- a/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs @@ -24,6 +24,8 @@ public class UdpConnectedIntegrationSpec : AkkaSpec public UdpConnectedIntegrationSpec(ITestOutputHelper output) : base(@" + akka.actor.serialize-creators = on + akka.actor.serialize-messages = on akka.io.udp-connected.nr-of-selectors = 1 akka.io.udp-connected.direct-buffer-pool-limit = 100 akka.io.udp-connected.direct-buffer-size = 1024 diff --git a/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs b/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs index a3af3b3d769..a4c69741bdb 100644 --- a/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs +++ b/src/core/Akka.Tests/IO/UdpIntegrationSpec.cs @@ -27,6 +27,8 @@ public class UdpIntegrationSpec : AkkaSpec public UdpIntegrationSpec(ITestOutputHelper output) : base(@" + akka.actor.serialize-creators = on + akka.actor.serialize-messages = on akka.io.udp.max-channels = unlimited akka.io.udp.nr-of-selectors = 1 akka.io.udp.direct-buffer-pool-limit = 100 diff --git a/src/core/Akka.Tests/IO/UdpListenerSpec.cs b/src/core/Akka.Tests/IO/UdpListenerSpec.cs index 7895c6b4944..7021126af1d 100644 --- a/src/core/Akka.Tests/IO/UdpListenerSpec.cs +++ b/src/core/Akka.Tests/IO/UdpListenerSpec.cs @@ -13,17 +13,21 @@ using Akka.IO; using Akka.TestKit; using Xunit; +using Xunit.Abstractions; using UdpListener = Akka.IO.UdpListener; namespace Akka.Tests.IO { public class UdpListenerSpec : AkkaSpec { - public UdpListenerSpec() - : base(@"akka.io.udp.max-channels = unlimited + public UdpListenerSpec(ITestOutputHelper output) + : base(@" + akka.actor.serialize-creators = on + akka.actor.serialize-messages = on + akka.io.udp.max-channels = unlimited akka.io.udp.nr-of-selectors = 1 akka.io.udp.direct-buffer-pool-limit = 100 - akka.io.udp.direct-buffer-size = 1024") + akka.io.udp.direct-buffer-size = 1024", output) { } [Fact] diff --git a/src/core/Akka/Actor/ActorCell.Children.cs b/src/core/Akka/Actor/ActorCell.Children.cs index 0affbfe94f6..cc38364269d 100644 --- a/src/core/Akka/Actor/ActorCell.Children.cs +++ b/src/core/Akka/Actor/ActorCell.Children.cs @@ -10,8 +10,10 @@ using System.Collections.Immutable; using System.Text; using System.Runtime.CompilerServices; +using System.Runtime.Serialization; using System.Threading; using Akka.Actor.Internal; +using Akka.Dispatch.SysMsg; using Akka.Serialization; using Akka.Util; using Akka.Util.Internal; @@ -452,6 +454,7 @@ private IInternalActorRef MakeChild(Props props, string name, bool async, bool s if (_systemImpl.Settings.SerializeAllCreators && !systemService && !(props.Deploy.Scope is LocalScope)) { var oldInfo = Serialization.Serialization.CurrentTransportInformation; + object propArgument = null; try { if (oldInfo == null) @@ -465,17 +468,24 @@ private IInternalActorRef MakeChild(Props props, string name, bool async, bool s { if (argument != null && !(argument is INoSerializationVerificationNeeded)) { + propArgument = argument; var serializer = ser.FindSerializerFor(argument); var bytes = serializer.ToBinary(argument); var ms = Serialization.Serialization.ManifestFor(serializer, argument); - if(ser.Deserialize(bytes, serializer.Identifier, ms) == null) + if (ser.Deserialize(bytes, serializer.Identifier, ms) == null) throw new ArgumentException( - $"Pre-creation serialization check failed at [${_self.Path}/{name}]", - nameof(name)); + $"Pre-creation serialization check failed at [${_self.Path}/{name}]", + nameof(name)); } } } } + catch (Exception e) + { + throw new SerializationException( + $"Failed to serialize and deserialize actor props argument of type {propArgument?.GetType()} for actor type [{props.Type}].", + e); + } finally { Serialization.Serialization.CurrentTransportInformation = oldInfo; diff --git a/src/core/Akka/Actor/ActorCell.cs b/src/core/Akka/Actor/ActorCell.cs index 349ea7420c8..13b5a00e6b7 100644 --- a/src/core/Akka/Actor/ActorCell.cs +++ b/src/core/Akka/Actor/ActorCell.cs @@ -13,6 +13,7 @@ using Akka.Dispatch.SysMsg; using Akka.Event; using System.Reflection; +using System.Runtime.Serialization; using Akka.Serialization; using Akka.Util; using Assert = System.Diagnostics.Debug; @@ -518,7 +519,16 @@ private Envelope SerializeAndDeserialize(Envelope envelope) if (unwrapped is INoSerializationVerificationNeeded) return envelope; - var deserializedMsg = SerializeAndDeserializePayload(unwrapped); + object deserializedMsg; + try + { + deserializedMsg = SerializeAndDeserializePayload(unwrapped); + } + catch (Exception e) + { + throw new SerializationException($"Failed to serialize and deserialize payload object [{unwrapped.GetType()}]. Envelope: [{envelope}], Actor type: [{Actor.GetType()}]", e); + } + if (deadLetter != null) return new Envelope(new DeadLetter(deserializedMsg, deadLetter.Sender, deadLetter.Recipient), envelope.Sender); return new Envelope(deserializedMsg, envelope.Sender); diff --git a/src/core/Akka/IO/Dns.cs b/src/core/Akka/IO/Dns.cs index 143120feedb..b510f51331f 100644 --- a/src/core/Akka/IO/Dns.cs +++ b/src/core/Akka/IO/Dns.cs @@ -59,7 +59,7 @@ public class Dns : ExtensionIdProvider /// /// TBD /// - public abstract class Command + public abstract class Command : INoSerializationVerificationNeeded { } /// diff --git a/src/core/Akka/IO/Tcp.cs b/src/core/Akka/IO/Tcp.cs index f5efd617c8d..99695646f44 100644 --- a/src/core/Akka/IO/Tcp.cs +++ b/src/core/Akka/IO/Tcp.cs @@ -53,7 +53,7 @@ public override TcpExt CreateExtension(ExtendedActorSystem system) #region internal connection messages - internal abstract class SocketCompleted { } + internal abstract class SocketCompleted : INoSerializationVerificationNeeded { } internal sealed class SocketSent : SocketCompleted { diff --git a/src/core/Akka/IO/TcpListener.cs b/src/core/Akka/IO/TcpListener.cs index 9407c1c153c..a26d8f80592 100644 --- a/src/core/Akka/IO/TcpListener.cs +++ b/src/core/Akka/IO/TcpListener.cs @@ -71,9 +71,9 @@ private IEnumerable Accept(int limit) { var self = Self; var saea = new SocketAsyncEventArgs(); - saea.Completed += (s, e) => self.Tell(e); + saea.Completed += (s, e) => self.Tell(new SocketEvent(e)); if (!_socket.AcceptAsync(saea)) - Self.Tell(saea); + Self.Tell(new SocketEvent(saea)); yield return saea; } } @@ -85,34 +85,34 @@ protected override SupervisorStrategy SupervisorStrategy() protected override bool Receive(object message) { - if (message is SocketAsyncEventArgs) + switch (message) { - var saea = message as SocketAsyncEventArgs; - if (saea.SocketError == SocketError.Success) - Context.ActorOf(Props.Create(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local)); - saea.AcceptSocket = null; + case SocketEvent evt: + var saea = evt.Args; + if (saea.SocketError == SocketError.Success) + Context.ActorOf(Props.Create(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local)); + saea.AcceptSocket = null; - if (!_socket.AcceptAsync(saea)) - Self.Tell(saea); - return true; - } - var resumeAccepting = message as Tcp.ResumeAccepting; - if (resumeAccepting != null) - { - _acceptLimit = resumeAccepting.BatchSize; - _saeas = Accept(_acceptLimit).ToArray(); - return true; - } - if (message is Tcp.Unbind) - { - _log.Debug("Unbinding endpoint {0}", _bind.LocalAddress); - _socket.Dispose(); - Sender.Tell(Tcp.Unbound.Instance); - _log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress); - Context.Stop(Self); - return true; + if (!_socket.AcceptAsync(saea)) + Self.Tell(new SocketEvent(saea)); + return true; + + case Tcp.ResumeAccepting resumeAccepting: + _acceptLimit = resumeAccepting.BatchSize; + _saeas = Accept(_acceptLimit).ToArray(); + return true; + + case Tcp.Unbind _: + _log.Debug("Unbinding endpoint {0}", _bind.LocalAddress); + _socket.Dispose(); + Sender.Tell(Tcp.Unbound.Instance); + _log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress); + Context.Stop(Self); + return true; + + default: + return false; } - return false; } /// @@ -130,5 +130,15 @@ protected override void PostStop() _log.Debug("Error closing ServerSocketChannel: {0}", e); } } + + private readonly struct SocketEvent : INoSerializationVerificationNeeded + { + public readonly SocketAsyncEventArgs Args; + + public SocketEvent(SocketAsyncEventArgs args) + { + Args = args; + } + } } } diff --git a/src/core/Akka/IO/Udp.cs b/src/core/Akka/IO/Udp.cs index b1aea2c2bff..213994ab542 100644 --- a/src/core/Akka/IO/Udp.cs +++ b/src/core/Akka/IO/Udp.cs @@ -34,7 +34,7 @@ public class Udp : ExtensionIdProvider { #region internal connection messages - internal abstract class SocketCompleted { } + internal abstract class SocketCompleted : INoSerializationVerificationNeeded { } internal sealed class SocketSent : SocketCompleted { @@ -104,7 +104,7 @@ public override UdpExt CreateExtension(ExtendedActorSystem system) } /// The common interface for and . - public abstract class Message { } + public abstract class Message : INoSerializationVerificationNeeded { } /// The common type of all commands supported by the UDP implementation. public abstract class Command : Message diff --git a/src/core/Akka/IO/UdpConnected.cs b/src/core/Akka/IO/UdpConnected.cs index 351648c6b77..30b9c213c64 100644 --- a/src/core/Akka/IO/UdpConnected.cs +++ b/src/core/Akka/IO/UdpConnected.cs @@ -34,7 +34,7 @@ public class UdpConnected : ExtensionIdProvider { #region internal connection messages - internal abstract class SocketCompleted + internal abstract class SocketCompleted : INoSerializationVerificationNeeded { public readonly SocketAsyncEventArgs EventArgs; @@ -92,7 +92,7 @@ public override UdpConnectedExt CreateExtension(ExtendedActorSystem system) /// /// The common interface for and . /// - public abstract class Message { } + public abstract class Message : INoSerializationVerificationNeeded { } /// /// The common type of all commands supported by the UDP implementation. @@ -372,7 +372,7 @@ private Disconnected() /// /// TBD /// - public class UdpConnectedExt : IOExtension + public class UdpConnectedExt : IOExtension, INoSerializationVerificationNeeded { public UdpConnectedExt(ExtendedActorSystem system) : this(system, UdpSettings.Create(system.Settings.Config.GetConfig("akka.io.udp-connected")))