Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Commit

Permalink
Wip upgrade to 2.6.x √ (#293)
Browse files Browse the repository at this point in the history
* Work-in-progress on updating to Akka 2.6

* Upgraded to Akka 2.6.5

* Adding MODULE$ to SystemMaterializer$ reflection configuration

Co-authored-by: Viktor Klang (√) <[email protected]>

* Adding Akka+Node as ci test target

Co-authored-by: Peter Vlugter <[email protected]>
  • Loading branch information
viktorklang and pvlugter authored May 5, 2020
1 parent 4c02476 commit 5565fa8
Show file tree
Hide file tree
Showing 20 changed files with 238 additions and 264 deletions.
236 changes: 109 additions & 127 deletions build.sbt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,6 @@ final class Target_io_netty_util_internal_PlatformDependent0 {
private static long ADDRESS_FIELD_OFFSET;
}

@TargetClass(
className =
"io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess")
final
class Target_io_grpc_netty_shaded_io_netty_util_internal_shaded_org_jctools_util_UnsafeRefArrayAccess {
@Alias
@RecomputeFieldValue(kind = RecomputeFieldValue.Kind.ArrayIndexShift, declClass = Object[].class)
public static int REF_ELEMENT_SHIFT;
}

@TargetClass(className = "io.grpc.netty.shaded.io.netty.util.internal.CleanerJava6")
final class Target_io_grpc_netty_shaded_io_netty_util_internal_CleanerJava6 {
@Alias
@RecomputeFieldValue(
kind = RecomputeFieldValue.Kind.FieldOffset,
declClassName = "java.nio.DirectByteBuffer",
name = "cleaner")
private static long CLEANER_FIELD_OFFSET;
}

@TargetClass(className = "io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent")
final class Target_io_grpc_netty_shaded_io_netty_util_internal_PlatformDependent {
@Alias
@RecomputeFieldValue(kind = RecomputeFieldValue.Kind.ArrayBaseOffset, declClass = byte[].class)
private static long BYTE_ARRAY_BASE_OFFSET;
}

@TargetClass(className = "io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0")
final class Target_io_grpc_netty_shaded_io_netty_util_internal_PlatformDependent0 {
@Alias
@RecomputeFieldValue(
kind = RecomputeFieldValue.Kind.FieldOffset,
declClassName = "java.nio.Buffer",
name = "address")
private static long ADDRESS_FIELD_OFFSET;
}

@TargetClass(className = "org.agrona.concurrent.AbstractConcurrentArrayQueue")
final class Target_org_agrona_concurrent_AbstractConcurrentArrayQueue {
@Alias
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import java.util.concurrent.ConcurrentHashMap
final class ProtobufGeneratedMessageRegisterFeature extends Feature {
private[this] final val cache = ConcurrentHashMap.newKeySet[String]
final val messageClasses = Vector(
classOf[akka.protobuf.GeneratedMessage],
classOf[akka.protobuf.GeneratedMessage.Builder[_]],
classOf[akka.protobuf.ProtocolMessageEnum],
classOf[com.google.protobuf.GeneratedMessageV3],
classOf[com.google.protobuf.GeneratedMessageV3.Builder[_]],
classOf[com.google.protobuf.ProtocolMessageEnum],
Expand All @@ -28,6 +25,7 @@ final class ProtobufGeneratedMessageRegisterFeature extends Feature {
if subtype != null && cache.add(subtype.getName)
} {
RuntimeReflection.register(subtype)
// TODO check if we only need to register `parseFrom` and `toByteArray`
subtype.getPackage.getName match {
case "akka.cluster.protobuf.msg" | "com.google.protobuf" | "akka.cluster.ddata.protobuf"
if !subtype.isInterface =>
Expand Down
6 changes: 2 additions & 4 deletions project/GraalVMPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,8 @@ object GraalVMPlugin extends AutoPlugin {
if (dep.data.isFile) Some(dep)
else {
projectArts.find { art =>
(art.get(sbt.Keys.artifact.key), dep.get(sbt.Keys.artifact.key)) match {
case (Some(l), Some(r)) =>
l.name == r.name && l.classifier == r.classifier
case _ => false
art.get(sbt.Keys.artifact.key).zip(dep.get(sbt.Keys.artifact.key)) exists {
case (l, r) => l.name == r.name && l.classifier == r.classifier
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@
name: "akka.serialization.SerializationExtension$"
fields: [{name: "MODULE$"}]
}
{
name: "akka.serialization.Serialization"
}
{
name: "akka.util.ByteString$ByteString1"
}
Expand All @@ -279,19 +282,66 @@
name: "akka.util.ByteString$ByteStrings"
}
{
name: "akka.serialization.NullSerializer$"
allDeclaredConstructors: true
name: "akka.serialization.NullSerializer$"
allDeclaredConstructors: true
}
{
name: "akka.serialization.JavaSerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.BooleanSerializer"
allDeclaredConstructors: true
}
{
name: "java.util.concurrent.TimeoutException"
}
{
name: "akka.serialization.JavaSerializer"
allDeclaredConstructors: true
name: "scala.Boolean"
}
{
name: "akka.serialization.BooleanSerializer"
allDeclaredConstructors: true
name: "java.lang.Boolean"
}
{
name: "akka.serialization.ByteArraySerializer"
allDeclaredConstructors: true
name: "akka.serialization.ByteArraySerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.IntSerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.LongSerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.StringSerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.ByteStringSerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.DisabledJavaSerializer"
allDeclaredConstructors: true
}
{
name: "java.lang.Throwable"
}
{
name: "akka.actor.Address"
}
{
name: "akka.Done"
}
{
name: "akka.NotUsed"
}
{
name: "akka.actor.PoisonPill$"
}
{
name: "akka.actor.Kill$"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
name: "akka.cluster.AutoDowning"
methods: [{name: "<init>", parameterTypes: ["akka.actor.ActorSystem"]}]
}
{
name: "akka.cluster.NoDowning"
methods: [{name: "<init>", parameterTypes: ["akka.actor.ActorSystem"]}]
}
{
name: "akka.cluster.protobuf.ClusterMessageSerializer"
allDeclaredConstructors: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
allDeclaredFields: true
allDeclaredConstructors: true
}
{
name: "akka.remote.RemoteWatcher$Heartbeat$"
}
{
name: "akka.remote.RemoteScope"
}
Expand Down Expand Up @@ -69,4 +72,11 @@
name: "akka.remote.serialization.IntSerializer"
allDeclaredConstructors: true
}
{
name: "akka.remote.serialization.ThrowableNotSerializableException"
}
{
name: "akka.remote.UniqueAddress"
}

]
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@
{
name: "akka.stream.impl.StreamSupervisor"
}
{
name: "akka.stream.SystemMaterializer$"
fields: [{name: "MODULE$"}]
}
{
name: "akka.stream.impl.fusing.ActorGraphInterpreter"
allDeclaredFields: true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Args = -H:ReflectionConfigurationResources=${.}/reflect-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[
{
"name": "com.typesafe.sslconfig.ssl.NoopHostnameVerifier"
allDeclaredConstructors: true
}
]
12 changes: 7 additions & 5 deletions proxy/core/src/main/resources/cloudstate-common.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
akka {
actor {
provider = cluster
allow-java-serialization = off
warn-about-java-serializer-usage = off
internal-dispatcher = akka.actor.default-dispatcher
serializers {
crdt-serializers = "io.cloudstate.proxy.crdt.CrdtSerializers"
proto-any = "io.cloudstate.proxy.ProtobufAnySerializer"
Expand Down Expand Up @@ -37,8 +40,8 @@ akka {
// Avoid Netty being loaded
enabled-transports = []
artery {
enabled = on
transport = tcp
// FIXME do we need to set canonical.hostname?
// FIXME do we need to set advanced.tcp.connection-timeout?
canonical.port = ${?REMOTING_PORT}
bind.port = ${?REMOTING_PORT}
}
Expand All @@ -47,16 +50,15 @@ akka {
cluster {
shutdown-after-unsuccessful-join-seed-nodes = 60s


sharding.state-store-mode = ddata

// Non-durable for now, since we can't get native-image to work with lmdb right now
sharding.distributed-data.durable.keys = []

// fixme Of course, this is not ideal, but not much choice at the moment.
auto-down-unreachable-after = 30s

sharding {
rebalance-interval = 5s
passivate-idle-entity-after = off // FIXME put in a good value here
}

// Native image doesn't support JMX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import akka.cluster.Cluster
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement
import akka.pattern.{BackoffOpts, BackoffSupervisor}
import akka.stream.ActorMaterializer
import akka.stream.SystemMaterializer
import org.slf4j.LoggerFactory
import sun.misc.Signal

Expand Down Expand Up @@ -140,7 +140,7 @@ object CloudStateProxyMain {
}

implicit val system = configuration.fold(ActorSystem("cloudstate-proxy"))(c => ActorSystem("cloudstate-proxy", c))
implicit val materializer = ActorMaterializer()
implicit val materializer = SystemMaterializer(system)
import system.dispatcher

val c = system.settings.config.getConfig("cloudstate.proxy")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import akka.cluster.singleton.{
ClusterSingletonProxySettings
}
import akka.grpc.GrpcClientSettings
import akka.stream.ActorMaterializer
import akka.stream.Materializer
import com.google.protobuf.DescriptorProtos
import com.google.protobuf.Descriptors.{FileDescriptor, ServiceDescriptor}
import com.typesafe.config.Config
Expand Down Expand Up @@ -119,7 +119,7 @@ object EntityDiscoveryManager {
}
}

def props(config: Configuration)(implicit mat: ActorMaterializer): Props =
def props(config: Configuration)(implicit mat: Materializer): Props =
Props(new EntityDiscoveryManager(config))

final case object Ready // Responds with true / false
Expand All @@ -138,7 +138,7 @@ object EntityDiscoveryManager {
}

class EntityDiscoveryManager(config: EntityDiscoveryManager.Configuration)(
implicit mat: ActorMaterializer
implicit mat: Materializer
) extends Actor
with ActorLogging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.cloudstate.proxy.eventing

import akka.{Done, NotUsed}
import akka.actor.Cancellable
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy}
import akka.stream.{FlowShape, Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, RunnableGraph, Sink, Source}
import io.cloudstate.protocol.entity.{ClientAction, EntityDiscoveryClient, Failure, Reply, UserFunctionError}
import io.cloudstate.proxy.{Serve, UserFunctionRouter}
Expand Down Expand Up @@ -71,7 +71,7 @@ object EventingManager {
else List(EventMapping(entity, endpoints))
}

def createSupport(eventConfig: Config)(implicit materializer: ActorMaterializer): Option[EventingSupport] =
def createSupport(eventConfig: Config)(implicit materializer: Materializer): Option[EventingSupport] =
eventConfig.getString("support") match {
case "none" =>
log.info("Eventing support turned off in configuration")
Expand Down
Loading

0 comments on commit 5565fa8

Please sign in to comment.