From a54c5e96fb6265d0f6a923acf4f2f3eaac82d5d1 Mon Sep 17 00:00:00 2001 From: Sadek Munawar Date: Wed, 27 Nov 2024 15:47:06 -0500 Subject: [PATCH] Enable deserialization of old Akka cluster messages --- .../protobuf/ClusterMessageSerializer.scala | 66 +++++++++++-------- .../ClusterMessageSerializerSpec.scala | 8 +++ 2 files changed, 45 insertions(+), 29 deletions(-) diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala b/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala index 3199896ae75..66862a3ddb0 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializer.scala @@ -125,35 +125,43 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]") } - def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { - case HeartbeatManifest => deserializeHeartBeat(bytes) - case HeartbeatRspManifest => deserializeHeartBeatResponse(bytes) - case GossipStatusManifest => deserializeGossipStatus(bytes) - case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes) - case InitJoinManifest => deserializeInitJoin(bytes) - case InitJoinAckManifest => deserializeInitJoinAck(bytes) - case InitJoinNackManifest => deserializeInitJoinNack(bytes) - case JoinManifest => deserializeJoin(bytes) - case WelcomeManifest => deserializeWelcome(bytes) - case LeaveManifest => deserializeLeave(bytes) - case DownManifest => deserializeDown(bytes) - case ExitingConfirmedManifest => deserializeExitingConfirmed(bytes) - case ClusterRouterPoolManifest => deserializeClusterRouterPool(bytes) - // needs to stay in Akka 2.6.5 to be able to talk to an Akka 2.5.{3,4} node during rolling upgrade - case HeartBeatManifestPre2523 => deserializeHeartBeatAsAddress(bytes) - case HeartBeatRspManifest2523 => deserializeHeartBeatRspAsUniqueAddress(bytes) - case OldGossipStatusManifest => deserializeGossipStatus(bytes) - case OldGossipEnvelopeManifest => deserializeGossipEnvelope(bytes) - case OldInitJoinManifest => deserializeInitJoin(bytes) - case OldInitJoinAckManifest => deserializeInitJoinAck(bytes) - case OldInitJoinNackManifest => deserializeInitJoinNack(bytes) - case OldJoinManifest => deserializeJoin(bytes) - case OldWelcomeManifest => deserializeWelcome(bytes) - case OldLeaveManifest => deserializeLeave(bytes) - case OldDownManifest => deserializeDown(bytes) - case OldExitingConfirmedManifest => deserializeExitingConfirmed(bytes) - case OldClusterRouterPoolManifest => deserializeClusterRouterPool(bytes) - case _ => throw new IllegalArgumentException(s"Unknown manifest [$manifest]") + def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = { + val updatedManifest = { + if (manifest.startsWith("akka")) + manifest.replaceFirst("akka", "org.apache.pekko") + else manifest + } + + updatedManifest match { + case HeartbeatManifest => deserializeHeartBeat(bytes) + case HeartbeatRspManifest => deserializeHeartBeatResponse(bytes) + case GossipStatusManifest => deserializeGossipStatus(bytes) + case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes) + case InitJoinManifest => deserializeInitJoin(bytes) + case InitJoinAckManifest => deserializeInitJoinAck(bytes) + case InitJoinNackManifest => deserializeInitJoinNack(bytes) + case JoinManifest => deserializeJoin(bytes) + case WelcomeManifest => deserializeWelcome(bytes) + case LeaveManifest => deserializeLeave(bytes) + case DownManifest => deserializeDown(bytes) + case ExitingConfirmedManifest => deserializeExitingConfirmed(bytes) + case ClusterRouterPoolManifest => deserializeClusterRouterPool(bytes) + // needs to stay in Akka 2.6.5 to be able to talk to an Akka 2.5.{3,4} node during rolling upgrade + case HeartBeatManifestPre2523 => deserializeHeartBeatAsAddress(bytes) + case HeartBeatRspManifest2523 => deserializeHeartBeatRspAsUniqueAddress(bytes) + case OldGossipStatusManifest => deserializeGossipStatus(bytes) + case OldGossipEnvelopeManifest => deserializeGossipEnvelope(bytes) + case OldInitJoinManifest => deserializeInitJoin(bytes) + case OldInitJoinAckManifest => deserializeInitJoinAck(bytes) + case OldInitJoinNackManifest => deserializeInitJoinNack(bytes) + case OldJoinManifest => deserializeJoin(bytes) + case OldWelcomeManifest => deserializeWelcome(bytes) + case OldLeaveManifest => deserializeLeave(bytes) + case OldDownManifest => deserializeDown(bytes) + case OldExitingConfirmedManifest => deserializeExitingConfirmed(bytes) + case OldClusterRouterPoolManifest => deserializeClusterRouterPool(bytes) + case _ => throw new IllegalArgumentException(s"Unknown manifest [$manifest]") + } } def compress(msg: MessageLite): Array[Byte] = { diff --git a/cluster/src/test/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializerSpec.scala b/cluster/src/test/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializerSpec.scala index 7615de9585c..8891dbf2f3a 100644 --- a/cluster/src/test/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializerSpec.scala +++ b/cluster/src/test/scala/org/apache/pekko/cluster/protobuf/ClusterMessageSerializerSpec.scala @@ -177,6 +177,14 @@ class ClusterMessageSerializerSpec extends PekkoSpec("pekko.actor.provider = clu ClusterMessageSerializer.OldWelcomeManifest) } + "be de-serializable with class manifests from Akka nodes" in { + val oldAkkaJoinAckManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoinAck" + val address = Address("akka", "system", "some.host.org", 4711) + checkDeserializationWithManifest( + InternalClusterAction.InitJoinAck(address, CompatibleConfig(ConfigFactory.empty)), + oldAkkaJoinAckManifest) + } + "add a default data center role to gossip if none is present" in { val env = roundtrip(GossipEnvelope(a1.uniqueAddress, d1.uniqueAddress, Gossip(SortedSet(a1, d1)))) env.gossip.members.head.roles should be(Set(ClusterSettings.DcRolePrefix + "default"))