Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix serialization verification problem with Akka.IO messages #4974

16 changes: 9 additions & 7 deletions src/core/Akka/Actor/ActorCell.cs
Original file line number Diff line number Diff line change
Expand Up @@ -519,10 +519,16 @@ private Envelope SerializeAndDeserialize(Envelope envelope)
if (unwrapped is INoSerializationVerificationNeeded)
return envelope;

if(unwrapped.GetType().Namespace?.StartsWith("System.Net.Sockets") ?? false)
return envelope;
object deserializedMsg;
try
{
deserializedMsg = SerializeAndDeserializePayload(unwrapped);
}
catch (Exception e)
{
throw new SerializationException($"Failed to serialize and deserialize payload object [{unwrapped.GetType()}]. Envelope: [{envelope}]", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Record the actor type too

}

var deserializedMsg = SerializeAndDeserializePayload(unwrapped);
if (deadLetter != null)
return new Envelope(new DeadLetter(deserializedMsg, deadLetter.Sender, deadLetter.Recipient), envelope.Sender);
return new Envelope(deserializedMsg, envelope.Sender);
Expand All @@ -544,10 +550,6 @@ private object SerializeAndDeserializePayload(object obj)
var manifest = Serialization.Serialization.ManifestFor(serializer, obj);
return _systemImpl.Serialization.Deserialize(bytes, serializer.Identifier, manifest);
}
catch (Exception e)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the exception handling to the caller function so we can also capture the envelope

{
throw new SerializationException($"Failed to serialize and deserialize payload object {obj.GetType()}", e);
}
finally
{
Serialization.Serialization.CurrentTransportInformation = oldInfo;
Expand Down
64 changes: 37 additions & 27 deletions src/core/Akka/IO/TcpListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ private IEnumerable<SocketAsyncEventArgs> Accept(int limit)
{
var self = Self;
var saea = new SocketAsyncEventArgs();
saea.Completed += (s, e) => self.Tell(e);
saea.Completed += (s, e) => self.Tell(new SocketEvent(e));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrap naked SocketAsyncEventArgs in a struct

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (!_socket.AcceptAsync(saea))
Self.Tell(saea);
Self.Tell(new SocketEvent(saea));
yield return saea;
}
}
Expand All @@ -85,34 +85,34 @@ protected override SupervisorStrategy SupervisorStrategy()

protected override bool Receive(object message)
{
if (message is SocketAsyncEventArgs)
switch (message)
{
var saea = message as SocketAsyncEventArgs;
if (saea.SocketError == SocketError.Success)
Context.ActorOf(Props.Create<TcpIncomingConnection>(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local));
saea.AcceptSocket = null;
case SocketEvent evt:
var saea = evt.Args;
if (saea.SocketError == SocketError.Success)
Context.ActorOf(Props.Create<TcpIncomingConnection>(_tcp, saea.AcceptSocket, _bind.Handler, _bind.Options, _bind.PullMode).WithDeploy(Deploy.Local));
saea.AcceptSocket = null;

if (!_socket.AcceptAsync(saea))
Self.Tell(saea);
return true;
}
var resumeAccepting = message as Tcp.ResumeAccepting;
if (resumeAccepting != null)
{
_acceptLimit = resumeAccepting.BatchSize;
_saeas = Accept(_acceptLimit).ToArray();
return true;
}
if (message is Tcp.Unbind)
{
_log.Debug("Unbinding endpoint {0}", _bind.LocalAddress);
_socket.Dispose();
Sender.Tell(Tcp.Unbound.Instance);
_log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress);
Context.Stop(Self);
return true;
if (!_socket.AcceptAsync(saea))
Self.Tell(new SocketEvent(saea));
return true;

case Tcp.ResumeAccepting resumeAccepting:
_acceptLimit = resumeAccepting.BatchSize;
_saeas = Accept(_acceptLimit).ToArray();
return true;

case Tcp.Unbind _:
_log.Debug("Unbinding endpoint {0}", _bind.LocalAddress);
_socket.Dispose();
Sender.Tell(Tcp.Unbound.Instance);
_log.Debug("Unbound endpoint {0}, stopping listener", _bind.LocalAddress);
Context.Stop(Self);
return true;

default:
return false;
}
return false;
}

/// <summary>
Expand All @@ -130,5 +130,15 @@ protected override void PostStop()
_log.Debug("Error closing ServerSocketChannel: {0}", e);
}
}

private struct SocketEvent : INoSerializationVerificationNeeded
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SocketAsyncEventArgs wrapper

{
public SocketAsyncEventArgs Args;

public SocketEvent(SocketAsyncEventArgs args)
{
Args = args;
}
}
}
}