Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable deserialization of old Akka cluster messages (mixed pekko/akka cluster) #1578

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

sadekmunawar
Copy link
Contributor

Forming a cluster with Akka nodes requires the deserialization of cluster messages sent by the Akka. This commit fixes the following exception that occurs during deserialization.

[akka://[email protected]:2551] with serializer id [5] and manifest [akka.cluster.InternalClusterAction$InitJoinAck].
java.lang.IllegalArgumentException: Unknown manifest [akka.cluster.InternalClusterAction$InitJoinAck]
    at org.apache.pekko.cluster.protobuf.ClusterMessageSerializer.fromBinary(ClusterMessageSerializer.scala:156)

case _ => throw new IllegalArgumentException(s"Unknown manifest [$manifest]")
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
val updatedManifest = {
if (manifest.startsWith("akka"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only needed when migrating from Akka, may be better under a boolean guard.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, @mdedetrich once added one, and then is can be if (guard && manifest.startsWith("akka")), WDYT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can provide a PR for this based on checking pekko.remote.accept-protocol-names config. That config is an array value and if "akka" is in the array then we can allow this check. We only need to do this config once so the boolean result can be stored as a val.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. If we decide to keep the changes in this PR, then having boolean guard would be better.

@@ -177,6 +177,14 @@ class ClusterMessageSerializerSpec extends PekkoSpec("pekko.actor.provider = clu
ClusterMessageSerializer.OldWelcomeManifest)
}

"be de-serializable with class manifests from Akka nodes" in {
val oldAkkaJoinAckManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoinAck"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sadekmunawar this class only needs to be supported for Akka prior to v2.6.4.

Do we really need to support clusters that are running with very old Akka releases? Ideally we would only support pretty recent Akka releases. It is a pity that Akka changed the cluster messages in a patch release (2.6.5).

// Kept for one version iteration from Akka 2.6.4 to allow rolling migration to short manifests
// can be removed in Akka 2.6.6 or later.
val OldJoinManifest = s"org.apache.pekko.cluster.InternalClusterAction$$Join"
val OldWelcomeManifest = s"org.apache.pekko.cluster.InternalClusterAction$$Welcome"
val OldLeaveManifest = s"org.apache.pekko.cluster.ClusterUserAction$$Leave"
val OldDownManifest = s"org.apache.pekko.cluster.ClusterUserAction$$Down"
val OldInitJoinManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoin$$"
val OldInitJoinAckManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoinAck"
val OldInitJoinNackManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoinNack"
val HeartBeatManifestPre2523 = s"org.apache.pekko.cluster.ClusterHeartbeatSender$$Heartbeat"
val HeartBeatRspManifest2523 = s"org.apache.pekko.cluster.ClusterHeartbeatSender$$HeartbeatRsp"
val OldExitingConfirmedManifest = s"org.apache.pekko.cluster.InternalClusterAction$$ExitingConfirmed"
val OldGossipStatusManifest = "org.apache.pekko.cluster.GossipStatus"
val OldGossipEnvelopeManifest = "org.apache.pekko.cluster.GossipEnvelope"
val OldClusterRouterPoolManifest = "org.apache.pekko.cluster.routing.ClusterRouterPool"

@pjfanning
Copy link
Contributor

I have updated https://cwiki.apache.org/confluence/display/PEKKO/Pekko+Akka+Compatibility and include the fact the we only support forming clusters with Akka nodes of version 2.6.5 and above.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants