Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable deserialization of old Akka cluster messages (mixed pekko/akka cluster) #1578

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,35 +125,43 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem)
throw new IllegalArgumentException(s"Can't serialize object of type ${obj.getClass} in [${getClass.getName}]")
}

def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case HeartbeatManifest => deserializeHeartBeat(bytes)
case HeartbeatRspManifest => deserializeHeartBeatResponse(bytes)
case GossipStatusManifest => deserializeGossipStatus(bytes)
case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes)
case InitJoinManifest => deserializeInitJoin(bytes)
case InitJoinAckManifest => deserializeInitJoinAck(bytes)
case InitJoinNackManifest => deserializeInitJoinNack(bytes)
case JoinManifest => deserializeJoin(bytes)
case WelcomeManifest => deserializeWelcome(bytes)
case LeaveManifest => deserializeLeave(bytes)
case DownManifest => deserializeDown(bytes)
case ExitingConfirmedManifest => deserializeExitingConfirmed(bytes)
case ClusterRouterPoolManifest => deserializeClusterRouterPool(bytes)
// needs to stay in Akka 2.6.5 to be able to talk to an Akka 2.5.{3,4} node during rolling upgrade
case HeartBeatManifestPre2523 => deserializeHeartBeatAsAddress(bytes)
case HeartBeatRspManifest2523 => deserializeHeartBeatRspAsUniqueAddress(bytes)
case OldGossipStatusManifest => deserializeGossipStatus(bytes)
case OldGossipEnvelopeManifest => deserializeGossipEnvelope(bytes)
case OldInitJoinManifest => deserializeInitJoin(bytes)
case OldInitJoinAckManifest => deserializeInitJoinAck(bytes)
case OldInitJoinNackManifest => deserializeInitJoinNack(bytes)
case OldJoinManifest => deserializeJoin(bytes)
case OldWelcomeManifest => deserializeWelcome(bytes)
case OldLeaveManifest => deserializeLeave(bytes)
case OldDownManifest => deserializeDown(bytes)
case OldExitingConfirmedManifest => deserializeExitingConfirmed(bytes)
case OldClusterRouterPoolManifest => deserializeClusterRouterPool(bytes)
case _ => throw new IllegalArgumentException(s"Unknown manifest [$manifest]")
def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
val updatedManifest = {
if (manifest.startsWith("akka"))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is only needed when migrating from Akka, may be better under a boolean guard.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, @mdedetrich once added one, and then is can be if (guard && manifest.startsWith("akka")), WDYT

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can provide a PR for this based on checking pekko.remote.accept-protocol-names config. That config is an array value and if "akka" is in the array then we can allow this check. We only need to do this config once so the boolean result can be stored as a val.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. If we decide to keep the changes in this PR, then having boolean guard would be better.

manifest.replaceFirst("akka", "org.apache.pekko")
else manifest
}

updatedManifest match {
case HeartbeatManifest => deserializeHeartBeat(bytes)
case HeartbeatRspManifest => deserializeHeartBeatResponse(bytes)
case GossipStatusManifest => deserializeGossipStatus(bytes)
case GossipEnvelopeManifest => deserializeGossipEnvelope(bytes)
case InitJoinManifest => deserializeInitJoin(bytes)
case InitJoinAckManifest => deserializeInitJoinAck(bytes)
case InitJoinNackManifest => deserializeInitJoinNack(bytes)
case JoinManifest => deserializeJoin(bytes)
case WelcomeManifest => deserializeWelcome(bytes)
case LeaveManifest => deserializeLeave(bytes)
case DownManifest => deserializeDown(bytes)
case ExitingConfirmedManifest => deserializeExitingConfirmed(bytes)
case ClusterRouterPoolManifest => deserializeClusterRouterPool(bytes)
// needs to stay in Akka 2.6.5 to be able to talk to an Akka 2.5.{3,4} node during rolling upgrade
case HeartBeatManifestPre2523 => deserializeHeartBeatAsAddress(bytes)
case HeartBeatRspManifest2523 => deserializeHeartBeatRspAsUniqueAddress(bytes)
case OldGossipStatusManifest => deserializeGossipStatus(bytes)
case OldGossipEnvelopeManifest => deserializeGossipEnvelope(bytes)
case OldInitJoinManifest => deserializeInitJoin(bytes)
case OldInitJoinAckManifest => deserializeInitJoinAck(bytes)
case OldInitJoinNackManifest => deserializeInitJoinNack(bytes)
case OldJoinManifest => deserializeJoin(bytes)
case OldWelcomeManifest => deserializeWelcome(bytes)
case OldLeaveManifest => deserializeLeave(bytes)
case OldDownManifest => deserializeDown(bytes)
case OldExitingConfirmedManifest => deserializeExitingConfirmed(bytes)
case OldClusterRouterPoolManifest => deserializeClusterRouterPool(bytes)
case _ => throw new IllegalArgumentException(s"Unknown manifest [$manifest]")
}
}

def compress(msg: MessageLite): Array[Byte] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,14 @@ class ClusterMessageSerializerSpec extends PekkoSpec("pekko.actor.provider = clu
ClusterMessageSerializer.OldWelcomeManifest)
}

"be de-serializable with class manifests from Akka nodes" in {
val oldAkkaJoinAckManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoinAck"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sadekmunawar this class only needs to be supported for Akka prior to v2.6.4.

Do we really need to support clusters that are running with very old Akka releases? Ideally we would only support pretty recent Akka releases. It is a pity that Akka changed the cluster messages in a patch release (2.6.5).

// Kept for one version iteration from Akka 2.6.4 to allow rolling migration to short manifests
// can be removed in Akka 2.6.6 or later.
val OldJoinManifest = s"org.apache.pekko.cluster.InternalClusterAction$$Join"
val OldWelcomeManifest = s"org.apache.pekko.cluster.InternalClusterAction$$Welcome"
val OldLeaveManifest = s"org.apache.pekko.cluster.ClusterUserAction$$Leave"
val OldDownManifest = s"org.apache.pekko.cluster.ClusterUserAction$$Down"
val OldInitJoinManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoin$$"
val OldInitJoinAckManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoinAck"
val OldInitJoinNackManifest = s"org.apache.pekko.cluster.InternalClusterAction$$InitJoinNack"
val HeartBeatManifestPre2523 = s"org.apache.pekko.cluster.ClusterHeartbeatSender$$Heartbeat"
val HeartBeatRspManifest2523 = s"org.apache.pekko.cluster.ClusterHeartbeatSender$$HeartbeatRsp"
val OldExitingConfirmedManifest = s"org.apache.pekko.cluster.InternalClusterAction$$ExitingConfirmed"
val OldGossipStatusManifest = "org.apache.pekko.cluster.GossipStatus"
val OldGossipEnvelopeManifest = "org.apache.pekko.cluster.GossipEnvelope"
val OldClusterRouterPoolManifest = "org.apache.pekko.cluster.routing.ClusterRouterPool"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would be worried that if we need to support the old Akka messages as well as the newer format that changes will also need to made to toBinary too because that would possibly leak class names with pekko package names that Akka nodes will not understand.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I upgraded from Akka-2.6.2, so this change was necessary. But updating the documentation to advise users to perform the rolling upgrade from a version higher than 2.6.4 may be sufficient.

An alternative approach would be to revert the old manifest back to Akka (e.g. val OldJoinManifest = s"akka.cluster.InternalClusterAction$$Join"). No version of Pekko is expected to use the old manifest in the cluster messages, right?

val address = Address("akka", "system", "some.host.org", 4711)
checkDeserializationWithManifest(
InternalClusterAction.InitJoinAck(address, CompatibleConfig(ConfigFactory.empty)),
oldAkkaJoinAckManifest)
}

"add a default data center role to gossip if none is present" in {
val env = roundtrip(GossipEnvelope(a1.uniqueAddress, d1.uniqueAddress, Gossip(SortedSet(a1, d1))))
env.gossip.members.head.roles should be(Set(ClusterSettings.DcRolePrefix + "default"))
Expand Down
Loading