From 2c8584820879d8f5d60e9fd8b92d6eed09b9d29f Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 28 Dec 2023 03:39:35 +0100 Subject: [PATCH] add extra changes needed to get akka cluster support --- .../org/apache/pekko/cluster/ConfigUtil.scala | 54 +++++++++++++++++++ .../pekko/cluster/SeedNodeProcess.scala | 21 +++++++- 2 files changed, 74 insertions(+), 1 deletion(-) create mode 100644 cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala new file mode 100644 index 00000000000..6c2bd2254e2 --- /dev/null +++ b/cluster/src/main/scala/org/apache/pekko/cluster/ConfigUtil.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.cluster + +import com.typesafe.config.{ Config, ConfigValue, ConfigValueFactory, ConfigValueType } + +import scala.annotation.nowarn + +private[cluster] object ConfigUtil { + + @nowarn("msg=deprecated") + def addAkkaConfig(cfg: Config, akkaVersion: String): Config = { + import scala.collection.JavaConverters._ + val innerSet = cfg.entrySet().asScala + .filter(e => e.getKey.startsWith("pekko.") && e.getValue.valueType() != ConfigValueType.OBJECT) + .map { entry => + entry.getKey.replace("pekko", "akka") -> adjustPackageNameIfNecessary(entry.getValue) + } + var newConfig = cfg + innerSet.foreach { case (key, value) => + newConfig = newConfig.withValue(key, value) + } + newConfig.withValue("akka.version", ConfigValueFactory.fromAnyRef(akkaVersion)) + } + + private def adjustPackageNameIfNecessary(cv: ConfigValue): ConfigValue = { + if (cv.valueType() == ConfigValueType.STRING) { + val str = cv.unwrapped().toString + if (str.startsWith("org.apache.pekko")) { + ConfigValueFactory.fromAnyRef(str.replace("org.apache.pekko", "akka")) + } else { + cv + } + } else { + cv + } + } + +} diff --git a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala index ab090e3615a..fb9eb722c22 100644 --- a/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala +++ b/cluster/src/main/scala/org/apache/pekko/cluster/SeedNodeProcess.scala @@ -47,6 +47,21 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon "Note that disabling it will allow the formation of a cluster with nodes having incompatible configuration settings. " + "This node will be shutdown!" + private lazy val needsAkkaConfig: Boolean = { + context.system.settings.config + .getStringList("pekko.remote.accept-protocol-names") + .contains("akka") + } + + private lazy val akkaVersion: String = { + val cfg = context.system.settings.config + if (cfg.hasPath("akka.version")) { + cfg.getString("akka.version") + } else { + cfg.getString("pekko.cluster.akka.version") + } + } + private def stopOrBecome(behavior: Option[Actor.Receive]): Unit = behavior match { case Some(done) => context.become(done) // JoinSeedNodeProcess @@ -65,8 +80,12 @@ private[cluster] abstract class SeedNodeProcess(joinConfigCompatChecker: JoinCon val configToValidate = JoinConfigCompatChecker.filterWithKeys(requiredNonSensitiveKeys, context.system.settings.config) + val adjustedConfig = if (needsAkkaConfig) + ConfigUtil.addAkkaConfig(configToValidate, akkaVersion) + else configToValidate + seedNodes.foreach { a => - context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(configToValidate) + context.actorSelection(context.parent.path.toStringWithAddress(a)) ! InitJoin(adjustedConfig) } }