Skip to content

Commit

Permalink
Handle mixed akka/pekko protocol names
Browse files Browse the repository at this point in the history
  • Loading branch information
mdedetrich committed Oct 22, 2023
1 parent f1f6285 commit 9a5c35a
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,8 @@ abstract class RemotingFeaturesSpec(val multiNodeConfig: RemotingFeaturesConfig)
remotePath,
Nobody,
None,
None)
None,
Set("pekko", "akka"))

rar.start()
rar
Expand Down
14 changes: 14 additions & 0 deletions remote/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,20 @@ pekko {
# is 'off'. Set this to 'off' to suppress these.
warn-unsafe-watch-outside-cluster = on

# When receiving requests from other remote actors, what are the valid
# prefix's to check against. Useful for when dealing with rolling cluster
# migrations with compatible systems such as Lightbend's Akka.
accept-protocol-names = ["pekko", "akka"]

# The protocol name to use when sending requests to other remote actors.
# Useful when dealing with rolling migration, i.e. temporarily change
# the protocol name to match another compatible actor implementation
# such as Lightbend's "akka" (whilst making sure accept-protocol-names
# contains "akka") so that you can gracefully migrate all nodes to Apache
# Pekko and then change the protocol-name back to "pekko" once all
# nodes have been are running on Apache Pekko
protocol-name = "pekko"

# Settings for the Phi accrual failure detector (http://www.jaist.ac.jp/~defago/files/pdf/IS_RR_2004_010.pdf
# [Hayashibara et al]) used for remote death watch.
# The default PhiAccrualFailureDetector will trigger if there are no heartbeats within
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,13 @@ object BoundAddressesExtension extends ExtensionId[BoundAddressesExtension] with

class BoundAddressesExtension(val system: ExtendedActorSystem) extends Extension {

private val remoteSettings: RemoteSettings = new RemoteSettings(system.settings.config)

/**
* Returns a mapping from a protocol to a set of bound addresses.
*/
def boundAddresses: Map[String, Set[Address]] = system.provider.asInstanceOf[RemoteActorRefProvider].transport match {
case artery: ArteryTransport => Map(ArteryTransport.ProtocolName -> Set(artery.bindAddress.address))
case artery: ArteryTransport => Map(remoteSettings.ProtocolName -> Set(artery.bindAddress.address))
case remoting: Remoting => remoting.boundAddresses
case other => throw new IllegalStateException(s"Unexpected transport type: ${other.getClass}")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,8 @@ private[pekko] class RemoteActorRefProvider(
val rpath =
(RootActorPath(address) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements)
.withUid(path.uid)
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d),
remoteSettings.AcceptProtocolNames)
} else {
warnIfNotRemoteActorRef(path)
local.actorOf(system, props, supervisor, path, systemService, deployment.headOption, false, async)
Expand All @@ -488,7 +489,8 @@ private[pekko] class RemoteActorRefProvider(
RootActorPath(address),
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.error(e, "No root guardian at [{}]", address)
Expand All @@ -513,7 +515,8 @@ private[pekko] class RemoteActorRefProvider(
RootActorPath(address) / elems,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
Expand Down Expand Up @@ -555,7 +558,8 @@ private[pekko] class RemoteActorRefProvider(
rootPath,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
Expand All @@ -578,7 +582,8 @@ private[pekko] class RemoteActorRefProvider(
path,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = remoteSettings.AcceptProtocolNames)
} catch {
case NonFatal(e) =>
log.warning("Error while resolving ActorRef [{}] due to [{}]", path, e.getMessage)
Expand Down Expand Up @@ -672,18 +677,26 @@ private[pekko] class RemoteActorRef private[pekko] (
val path: ActorPath,
val getParent: InternalActorRef,
props: Option[Props],
deploy: Option[Deploy])
deploy: Option[Deploy],
val acceptProtocolNames: Set[String])
extends InternalActorRef
with RemoteRef {

if (path.address.hasLocalScope)
throw new IllegalArgumentException(s"Unexpected local address in RemoteActorRef [$this]")

remote match {
case t: ArteryTransport =>
// detect mistakes such as using "pekko.tcp" with Artery
if (path.address.protocol != t.localAddress.address.protocol)
throw new IllegalArgumentException(s"Wrong protocol of [$path], expected [${t.localAddress.address.protocol}]")
case _: ArteryTransport =>
// detect mistakes such as using "pekko.tcp" with Artery, also handles pekko.remote.accept-protocol-names
if (!acceptProtocolNames.contains(path.address.protocol)) {
val expectedString = if (acceptProtocolNames.size == 1)
"expected"
else
"expected one of"

throw new IllegalArgumentException(
s"Wrong protocol of [$path], $expectedString [${acceptProtocolNames.mkString}]")
}
case _ =>
}
@volatile private[remote] var cachedAssociation: artery.Association = null
Expand All @@ -697,7 +710,8 @@ private[pekko] class RemoteActorRef private[pekko] (
s.headOption match {
case None => this
case Some("..") => getParent.getChild(name)
case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None)
case _ => new RemoteActorRef(remote, localAddressToUse, path / s, Nobody, props = None, deploy = None,
acceptProtocolNames = acceptProtocolNames)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ final class RemoteSettings(val config: Config) {
@deprecated("Classic remoting is deprecated, use Artery", "Akka 2.6.0")
val Adapters: Map[String, String] = configToMap(getConfig("pekko.remote.classic.adapters"))

val ProtocolName: String = getString("pekko.remote.protocol-name")

val AcceptProtocolNames: Set[String] =
immutableSeq(getStringList("pekko.remote.accept-protocol-names")).toSet.requiring(_.nonEmpty,
"accept-protocol-names must be non empty")

private def transportNames: immutable.Seq[String] =
immutableSeq(getStringList("pekko.remote.classic.enabled-transports"))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,12 +393,12 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
val (port, boundPort) = bindInboundStreams()

_localAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Canonical.Hostname, port),
Address(provider.remoteSettings.ProtocolName, system.name, settings.Canonical.Hostname, port),
AddressUidExtension(system).longAddressUid)
_addresses = Set(_localAddress.address)

_bindAddress = UniqueAddress(
Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort),
Address(provider.remoteSettings.ProtocolName, system.name, settings.Bind.Hostname, boundPort),
AddressUidExtension(system).longAddressUid)

flightRecorder.transportUniqueAddressSet(_localAddress)
Expand Down Expand Up @@ -954,8 +954,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
*/
private[remote] object ArteryTransport {

val ProtocolName = "pekko"

// Note that the used version of the header format for outbound messages is defined in
// `ArterySettings.Version` because that may depend on configuration settings.
// This is the highest supported version on receiving (decoding) side.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,12 @@ private[remote] class PekkoProtocolSettings(config: Config) {
.getMillisDuration("pekko.remote.classic.handshake-timeout")
.requiring(_ > Duration.Zero, "handshake-timeout must be > 0")
}

val PekkoScheme: String = new RemoteSettings(config).ProtocolName
}

@nowarn("msg=deprecated")
private[remote] object PekkoProtocolTransport { // Couldn't these go into the Remoting Extension/ RemoteSettings instead?
val PekkoScheme: String = "pekko"
val PekkoOverhead: Int = 0 // Don't know yet
val UniqueId = new java.util.concurrent.atomic.AtomicInteger(0)

Expand Down Expand Up @@ -122,7 +123,7 @@ private[remote] class PekkoProtocolTransport(
private val codec: PekkoPduCodec)
extends ActorTransportAdapter(wrappedTransport, system) {

override val addedSchemeIdentifier: String = PekkoScheme
override val addedSchemeIdentifier: String = new RemoteSettings(system.settings.config).ProtocolName

override def managementCommand(cmd: Any): Future[Boolean] = wrappedTransport.managementCommand(cmd)

Expand Down Expand Up @@ -229,8 +230,9 @@ private[remote] class PekkoProtocolHandle(
_wrappedHandle: AssociationHandle,
val handshakeInfo: HandshakeInfo,
private val stateActor: ActorRef,
private val codec: PekkoPduCodec)
extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, PekkoScheme) {
private val codec: PekkoPduCodec,
override val addedSchemeIdentifier: String)
extends AbstractTransportAdapterHandle(_localAddress, _remoteAddress, _wrappedHandle, addedSchemeIdentifier) {

override def write(payload: ByteString): Boolean = wrappedHandle.write(codec.constructPayload(payload))

Expand Down Expand Up @@ -713,7 +715,8 @@ private[remote] class ProtocolStateActor(
wrappedHandle,
handshakeInfo,
self,
codec))
codec,
settings.PekkoScheme))
readHandlerPromise.future
}

Expand All @@ -733,7 +736,8 @@ private[remote] class ProtocolStateActor(
wrappedHandle,
handshakeInfo,
self,
codec)))
codec,
settings.PekkoScheme)))
readHandlerPromise.future
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ pekko.actor.warn-about-java-serializer-usage = off
extinctPath,
Nobody,
props = None,
deploy = None)
deploy = None,
acceptProtocolNames = Set("pekko", "akka"))

val probe = TestProbe()
probe.watch(extinctRef)
Expand Down

0 comments on commit 9a5c35a

Please sign in to comment.