Skip to content

Commit

Permalink
Fix serialization verification problem with Akka.IO messages (#4974)
Browse files Browse the repository at this point in the history
* Fix serialization verification problem with Akka.IO messages

* Wrap naked SocketAsyncEventArgs in a struct that inherits INoSerializationVerificationNeeded

* Make the wrapper struct readonly

* Expand exception message with their actor types

* Update API Approver list
  • Loading branch information
Arkatufus authored Apr 23, 2021
1 parent 5251f36 commit 442f8eb
Show file tree
Hide file tree
Showing 13 changed files with 91 additions and 49 deletions.
8 changes: 4 additions & 4 deletions src/core/Akka.API.Tests/CoreAPISpec.ApproveCore.approved.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3256,7 +3256,7 @@ namespace Akka.IO
public static Akka.IO.Dns.Resolved Cached(string name, Akka.Actor.ActorSystem system) { }
public override Akka.IO.DnsExt CreateExtension(Akka.Actor.ExtendedActorSystem system) { }
public static Akka.IO.Dns.Resolved ResolveName(string name, Akka.Actor.ActorSystem system, Akka.Actor.IActorRef sender) { }
public abstract class Command
public abstract class Command : Akka.Actor.INoSerializationVerificationNeeded
{
protected Command() { }
}
Expand Down Expand Up @@ -3685,7 +3685,7 @@ namespace Akka.IO
{
protected Event() { }
}
public abstract class Message
public abstract class Message : Akka.Actor.INoSerializationVerificationNeeded
{
protected Message() { }
}
Expand Down Expand Up @@ -3792,7 +3792,7 @@ namespace Akka.IO
{
protected Event() { }
}
public abstract class Message
public abstract class Message : Akka.Actor.INoSerializationVerificationNeeded
{
protected Message() { }
}
Expand Down Expand Up @@ -3827,7 +3827,7 @@ namespace Akka.IO
public static readonly Akka.IO.UdpConnected.SuspendReading Instance;
}
}
public class UdpConnectedExt : Akka.IO.IOExtension
public class UdpConnectedExt : Akka.IO.IOExtension, Akka.Actor.INoSerializationVerificationNeeded
{
public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system) { }
public UdpConnectedExt(Akka.Actor.ExtendedActorSystem system, Akka.IO.UdpSettings settings) { }
Expand Down
4 changes: 3 additions & 1 deletion src/core/Akka.Tests/IO/TcpIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ class AckWithValue : Tcp.Event

public TcpIntegrationSpec(ITestOutputHelper output)
: base($@"akka.loglevel = DEBUG
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
akka.io.tcp.trace-logging = true
akka.io.tcp.write-commands-queue-max-size = {InternalConnectionActorMaxQueueSize}", output: output)
{ }
Expand Down Expand Up @@ -190,7 +192,7 @@ public void The_TCP_transport_implementation_should_properly_support_connecting_
var targetAddress = new DnsEndPoint("localhost", boundMsg.LocalAddress.AsInstanceOf<IPEndPoint>().Port);
var clientHandler = CreateTestProbe();
Sys.Tcp().Tell(new Tcp.Connect(targetAddress), clientHandler);
clientHandler.ExpectMsg<Tcp.Connected>(TimeSpan.FromMinutes(10));
clientHandler.ExpectMsg<Tcp.Connected>(TimeSpan.FromSeconds(3));
var clientEp = clientHandler.Sender;
clientEp.Tell(new Tcp.Register(clientHandler));
serverHandler.ExpectMsg<Tcp.Connected>();
Expand Down
8 changes: 5 additions & 3 deletions src/core/Akka.Tests/IO/TcpListenerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ namespace Akka.Tests.IO
public class TcpListenerSpec : AkkaSpec
{
public TcpListenerSpec()
: base(@"akka.io.tcp.register-timeout = 500ms
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
akka.io.tcp.register-timeout = 500ms
akka.io.tcp.max-received-message-size = 1024
akka.io.tcp.direct-buffer-size = 512
akka.actor.serialize-creators = on
akka.io.tcp.batch-accept-limit = 2
")
akka.io.tcp.batch-accept-limit = 2")
{ }

[Fact]
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Tests/IO/UdpConnectedIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class UdpConnectedIntegrationSpec : AkkaSpec

public UdpConnectedIntegrationSpec(ITestOutputHelper output)
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
akka.io.udp-connected.nr-of-selectors = 1
akka.io.udp-connected.direct-buffer-pool-limit = 100
akka.io.udp-connected.direct-buffer-size = 1024
Expand Down
2 changes: 2 additions & 0 deletions src/core/Akka.Tests/IO/UdpIntegrationSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public class UdpIntegrationSpec : AkkaSpec

public UdpIntegrationSpec(ITestOutputHelper output)
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
akka.io.udp.max-channels = unlimited
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
Expand Down
10 changes: 7 additions & 3 deletions src/core/Akka.Tests/IO/UdpListenerSpec.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,21 @@
using Akka.IO;
using Akka.TestKit;
using Xunit;
using Xunit.Abstractions;
using UdpListener = Akka.IO.UdpListener;

namespace Akka.Tests.IO
{
public class UdpListenerSpec : AkkaSpec
{
public UdpListenerSpec()
: base(@"akka.io.udp.max-channels = unlimited
public UdpListenerSpec(ITestOutputHelper output)
: base(@"
akka.actor.serialize-creators = on
akka.actor.serialize-messages = on
akka.io.udp.max-channels = unlimited
akka.io.udp.nr-of-selectors = 1
akka.io.udp.direct-buffer-pool-limit = 100
akka.io.udp.direct-buffer-size = 1024")
akka.io.udp.direct-buffer-size = 1024", output)
{ }

[Fact]
Expand Down
16 changes: 13 additions & 3 deletions src/core/Akka/Actor/ActorCell.Children.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
using System.Collections.Immutable;
using System.Text;
using System.Runtime.CompilerServices;
using System.Runtime.Serialization;
using System.Threading;
using Akka.Actor.Internal;
using Akka.Dispatch.SysMsg;
using Akka.Serialization;
using Akka.Util;
using Akka.Util.Internal;
Expand Down Expand Up @@ -452,6 +454,7 @@ private IInternalActorRef MakeChild(Props props, string name, bool async, bool s
if (_systemImpl.Settings.SerializeAllCreators && !systemService && !(props.Deploy.Scope is LocalScope))
{
var oldInfo = Serialization.Serialization.CurrentTransportInformation;
object propArgument = null;
try
{
if (oldInfo == null)
Expand All @@ -465,17 +468,24 @@ private IInternalActorRef MakeChild(Props props, string name, bool async, bool s
{
if (argument != null && !(argument is INoSerializationVerificationNeeded))
{
propArgument = argument;
var serializer = ser.FindSerializerFor(argument);
var bytes = serializer.ToBinary(argument);
var ms = Serialization.Serialization.ManifestFor(serializer, argument);
if(ser.Deserialize(bytes, serializer.Identifier, ms) == null)
if (ser.Deserialize(bytes, serializer.Identifier, ms) == null)
throw new ArgumentException(
$"Pre-creation serialization check failed at [${_self.Path}/{name}]",
nameof(name));
$"Pre-creation serialization check failed at [${_self.Path}/{name}]",
nameof(name));
}
}
}
}
catch (Exception e)
{
throw new SerializationException(
$"Failed to serialize and deserialize actor props argument of type {propArgument?.GetType()} for actor type [{props.Type}].",
e);
}
finally
{
Serialization.Serialization.CurrentTransportInformation = oldInfo;
Expand Down
12 changes: 11 additions & 1 deletion src/core/Akka/Actor/ActorCell.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
using Akka.Dispatch.SysMsg;
using Akka.Event;
using System.Reflection;
using System.Runtime.Serialization;
using Akka.Serialization;
using Akka.Util;
using Assert = System.Diagnostics.Debug;
Expand Down Expand Up @@ -518,7 +519,16 @@ private Envelope SerializeAndDeserialize(Envelope envelope)
if (unwrapped is INoSerializationVerificationNeeded)
return envelope;

var deserializedMsg = SerializeAndDeserializePayload(unwrapped);
object deserializedMsg;
try
{
deserializedMsg = SerializeAndDeserializePayload(unwrapped);
}
catch (Exception e)
{
throw new SerializationException($"Failed to serialize and deserialize payload object [{unwrapped.GetType()}]. Envelope: [{envelope}], Actor type: [{Actor.GetType()}]", e);
}

if (deadLetter != null)
return new Envelope(new DeadLetter(deserializedMsg, deadLetter.Sender, deadLetter.Recipient), envelope.Sender);
return new Envelope(deserializedMsg, envelope.Sender);
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/IO/Dns.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class Dns : ExtensionIdProvider<DnsExt>
/// <summary>
/// TBD
/// </summary>
public abstract class Command
public abstract class Command : INoSerializationVerificationNeeded
{ }

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion src/core/Akka/IO/Tcp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public override TcpExt CreateExtension(ExtendedActorSystem system)

#region internal connection messages

internal abstract class SocketCompleted { }
internal abstract class SocketCompleted : INoSerializationVerificationNeeded { }

internal sealed class SocketSent : SocketCompleted
{
Expand Down
64 changes: 37 additions & 27 deletions src/core/Akka/IO/TcpListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ private IEnumerable<SocketAsyncEventArgs> Accept(int limit)
{
var self = Self;
var saea = new SocketAsyncEventArgs();
saea.Completed += (s, e) => self.Tell(e);
saea.Completed += (s, e) => self.Tell(new SocketEvent(e));
if (!_socket.AcceptAsync(saea))
Self.Tell(saea);
Self.Tell(new SocketEvent(saea));
yield return saea;
}
}
Expand All @@ -85,34 +85,34 @@ protected override SupervisorStrategy SupervisorStrategy()

protected override bool Receive(object message)
{
if (message is SocketAsyncEventArgs)
switch (message)
{
var saea = message as SocketAsyncEventArgs;
if (saea.SocketError == SocketError.Success)
Context.ActorOf(Props.Create<TcpIncomingConnection>(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local));
saea.AcceptSocket = null;
case SocketEvent evt:
var saea = evt.Args;
if (saea.SocketError == SocketError.Success)
Context.ActorOf(Props.Create<TcpIncomingConnection>(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local));
saea.AcceptSocket = null;

if (!_socket.AcceptAsync(saea))
Self.Tell(saea);
return true;
}
var resumeAccepting = message as Tcp.ResumeAccepting;
if (resumeAccepting != null)
{
_acceptLimit = resumeAccepting.BatchSize;
_saeas = Accept(_acceptLimit).ToArray();
return true;
}
if (message is Tcp.Unbind)
{
_log.Debug("Unbinding endpoint {0}", _bind.LocalAddress);
_socket.Dispose();
Sender.Tell(Tcp.Unbound.Instance);
_log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress);
Context.Stop(Self);
return true;
if (!_socket.AcceptAsync(saea))
Self.Tell(new SocketEvent(saea));
return true;

case Tcp.ResumeAccepting resumeAccepting:
_acceptLimit = resumeAccepting.BatchSize;
_saeas = Accept(_acceptLimit).ToArray();
return true;

case Tcp.Unbind _:
_log.Debug("Unbinding endpoint {0}", _bind.LocalAddress);
_socket.Dispose();
Sender.Tell(Tcp.Unbound.Instance);
_log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress);
Context.Stop(Self);
return true;

default:
return false;
}
return false;
}

/// <summary>
Expand All @@ -130,5 +130,15 @@ protected override void PostStop()
_log.Debug("Error closing ServerSocketChannel: {0}", e);
}
}

private readonly struct SocketEvent : INoSerializationVerificationNeeded
{
public readonly SocketAsyncEventArgs Args;

public SocketEvent(SocketAsyncEventArgs args)
{
Args = args;
}
}
}
}
4 changes: 2 additions & 2 deletions src/core/Akka/IO/Udp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class Udp : ExtensionIdProvider<UdpExt>
{
#region internal connection messages

internal abstract class SocketCompleted { }
internal abstract class SocketCompleted : INoSerializationVerificationNeeded { }

internal sealed class SocketSent : SocketCompleted
{
Expand Down Expand Up @@ -104,7 +104,7 @@ public override UdpExt CreateExtension(ExtendedActorSystem system)
}

/// <summary>The common interface for <see cref="Command"/> and <see cref="Event"/>.</summary>
public abstract class Message { }
public abstract class Message : INoSerializationVerificationNeeded { }

/// <summary>The common type of all commands supported by the UDP implementation.</summary>
public abstract class Command : Message
Expand Down
6 changes: 3 additions & 3 deletions src/core/Akka/IO/UdpConnected.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class UdpConnected : ExtensionIdProvider<UdpConnectedExt>
{
#region internal connection messages

internal abstract class SocketCompleted
internal abstract class SocketCompleted : INoSerializationVerificationNeeded
{
public readonly SocketAsyncEventArgs EventArgs;

Expand Down Expand Up @@ -92,7 +92,7 @@ public override UdpConnectedExt CreateExtension(ExtendedActorSystem system)
/// <summary>
/// The common interface for <see cref="Command"/> and <see cref="Event"/>.
/// </summary>
public abstract class Message { }
public abstract class Message : INoSerializationVerificationNeeded { }

/// <summary>
/// The common type of all commands supported by the UDP implementation.
Expand Down Expand Up @@ -372,7 +372,7 @@ private Disconnected()
/// <summary>
/// TBD
/// </summary>
public class UdpConnectedExt : IOExtension
public class UdpConnectedExt : IOExtension, INoSerializationVerificationNeeded
{
public UdpConnectedExt(ExtendedActorSystem system)
: this(system, UdpSettings.Create(system.Settings.Config.GetConfig("akka.io.udp-connected")))
Expand Down

0 comments on commit 442f8eb

Please sign in to comment.