Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Wip upgrade to 2.6.x √ #293

Merged
merged 6 commits into from
May 5, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 109 additions & 127 deletions build.sbt

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -42,43 +42,6 @@ final class Target_io_netty_util_internal_PlatformDependent0 {
private static long ADDRESS_FIELD_OFFSET;
}

@TargetClass(
className =
"io.grpc.netty.shaded.io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess")
final
class Target_io_grpc_netty_shaded_io_netty_util_internal_shaded_org_jctools_util_UnsafeRefArrayAccess {
@Alias
@RecomputeFieldValue(kind = RecomputeFieldValue.Kind.ArrayIndexShift, declClass = Object[].class)
public static int REF_ELEMENT_SHIFT;
}
pvlugter marked this conversation as resolved.
Show resolved Hide resolved

@TargetClass(className = "io.grpc.netty.shaded.io.netty.util.internal.CleanerJava6")
final class Target_io_grpc_netty_shaded_io_netty_util_internal_CleanerJava6 {
@Alias
@RecomputeFieldValue(
kind = RecomputeFieldValue.Kind.FieldOffset,
declClassName = "java.nio.DirectByteBuffer",
name = "cleaner")
private static long CLEANER_FIELD_OFFSET;
}

@TargetClass(className = "io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent")
final class Target_io_grpc_netty_shaded_io_netty_util_internal_PlatformDependent {
@Alias
@RecomputeFieldValue(kind = RecomputeFieldValue.Kind.ArrayBaseOffset, declClass = byte[].class)
private static long BYTE_ARRAY_BASE_OFFSET;
}

@TargetClass(className = "io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent0")
final class Target_io_grpc_netty_shaded_io_netty_util_internal_PlatformDependent0 {
@Alias
@RecomputeFieldValue(
kind = RecomputeFieldValue.Kind.FieldOffset,
declClassName = "java.nio.Buffer",
name = "address")
private static long ADDRESS_FIELD_OFFSET;
}

@TargetClass(className = "org.agrona.concurrent.AbstractConcurrentArrayQueue")
final class Target_org_agrona_concurrent_AbstractConcurrentArrayQueue {
@Alias
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ import java.util.concurrent.ConcurrentHashMap
final class ProtobufGeneratedMessageRegisterFeature extends Feature {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be modified. With the old version of protobuf that Akka used, the equals, hashCode and toString implementations used reflection, which is why every method needed a reflection configuration. But the new protobuf version they're on, when configured to be optimized for speed (rather than bytecode size), generates equals, hashCode and toString methods that don't use reflection. Furthermore, all of the akka.protobufv3.internal.* subtypes, Akka never loads them or interacts with them using reflection, rather, Akka has hand coded serializers that convert the Akka types to these types statically, and uses short manifest names, rather than classnames, with static mappings of these names to the protobuf types, when deserializisng. So, we should be able to just remove akka.protobufv3.* from here, no reflection configuration should be needed for it.

We still do need some config for the Cloudstate messages that we expect to be serialized by the Akka protobuf serializer, but I think there might be some problems with the code below, the Akka protobuf serializer uses parseFrom and toByteArray reflectively, but I don't see those being registered. Also, not sure how much of an overhead this is going to be to have configuration for classes that don't need it, we have a lot of protobuf classes that don't get loaded reflectively, only those sent over Akka remoting as top level messages need reflection configuration, which is only a handful of messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jroper I'll try removing protobufv3 and report back

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jroper You were right! Removing the protobufv3 worked. I'll see if I can remove the need for the methods again.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jroper It would be interesting to simplyfy this AutomaticFeature—but the challenge is the ridiculously long turnaround time to try even the smallest changes. Feel free (encouraged!) to find optimizations here! 👍

private[this] final val cache = ConcurrentHashMap.newKeySet[String]
final val messageClasses = Vector(
classOf[akka.protobuf.GeneratedMessage],
classOf[akka.protobuf.GeneratedMessage.Builder[_]],
classOf[akka.protobuf.ProtocolMessageEnum],
classOf[com.google.protobuf.GeneratedMessageV3],
classOf[com.google.protobuf.GeneratedMessageV3.Builder[_]],
classOf[com.google.protobuf.ProtocolMessageEnum],
Expand Down
6 changes: 2 additions & 4 deletions project/GraalVMPlugin.scala
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,8 @@ object GraalVMPlugin extends AutoPlugin {
if (dep.data.isFile) Some(dep)
else {
projectArts.find { art =>
(art.get(sbt.Keys.artifact.key), dep.get(sbt.Keys.artifact.key)) match {
case (Some(l), Some(r)) =>
l.name == r.name && l.classifier == r.classifier
case _ => false
art.get(sbt.Keys.artifact.key).zip(dep.get(sbt.Keys.artifact.key)) exists {
case (l, r) => l.name == r.name && l.classifier == r.classifier
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,9 @@
name: "akka.serialization.SerializationExtension$"
fields: [{name: "MODULE$"}]
}
{
name: "akka.serialization.Serialization"
}
{
name: "akka.util.ByteString$ByteString1"
}
Expand All @@ -279,19 +282,66 @@
name: "akka.util.ByteString$ByteStrings"
}
{
name: "akka.serialization.NullSerializer$"
allDeclaredConstructors: true
name: "akka.serialization.NullSerializer$"
allDeclaredConstructors: true
}
{
name: "akka.serialization.JavaSerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.BooleanSerializer"
allDeclaredConstructors: true
}
{
name: "java.util.concurrent.TimeoutException"
}
{
name: "akka.serialization.JavaSerializer"
allDeclaredConstructors: true
name: "scala.Boolean"
}
{
name: "akka.serialization.BooleanSerializer"
allDeclaredConstructors: true
name: "java.lang.Boolean"
}
{
name: "akka.serialization.ByteArraySerializer"
allDeclaredConstructors: true
name: "akka.serialization.ByteArraySerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.IntSerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.LongSerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.StringSerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.ByteStringSerializer"
allDeclaredConstructors: true
}
{
name: "akka.serialization.DisabledJavaSerializer"
allDeclaredConstructors: true
}
{
name: "java.lang.Throwable"
}
{
name: "akka.actor.Address"
}
{
name: "akka.Done"
}
{
name: "akka.NotUsed"
}
{
name: "akka.actor.PoisonPill$"
}
{
name: "akka.actor.Kill$"
}
]
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,10 @@
name: "akka.cluster.AutoDowning"
methods: [{name: "<init>", parameterTypes: ["akka.actor.ActorSystem"]}]
}
{
name: "akka.cluster.NoDowning"
methods: [{name: "<init>", parameterTypes: ["akka.actor.ActorSystem"]}]
}
{
name: "akka.cluster.protobuf.ClusterMessageSerializer"
allDeclaredConstructors: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
allDeclaredFields: true
allDeclaredConstructors: true
}
{
name: "akka.remote.RemoteWatcher$Heartbeat$"
}
{
name: "akka.remote.RemoteScope"
}
Expand Down Expand Up @@ -69,4 +72,11 @@
name: "akka.remote.serialization.IntSerializer"
allDeclaredConstructors: true
}
{
name: "akka.remote.serialization.ThrowableNotSerializableException"
}
{
name: "akka.remote.UniqueAddress"
}

]
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@
{
name: "akka.stream.impl.StreamSupervisor"
}
{
name: "akka.stream.SystemMaterializer"
allDeclaredConstructors: true
}
{
name: "akka.stream.SystemMaterializer$"
allDeclaredConstructors: true
}
{
name: "akka.stream.impl.fusing.ActorGraphInterpreter"
allDeclaredFields: true
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Args = -H:ReflectionConfigurationResources=${.}/reflect-config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[
{
"name": "com.typesafe.sslconfig.ssl.NoopHostnameVerifier"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jroper @pvlugter We need to decide on what to choose for our usage—I have no idea and the documentation on the matter was not very helpful either. Right now it loads the NoopHostnameVerifier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Depends on what it's being used for.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@patriknw Do you have any suggestion what what hostname verifier we should be using?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is sslconfig. That isn't used for Akka remoting. It was deprecated in Akka 2.6. I would be surprised if it's used by Akka gRPC. @raboof do you know? Or is it still used in Akka HTTP?

allDeclaredConstructors: true
}
]
14 changes: 11 additions & 3 deletions proxy/core/src/main/resources/cloudstate-common.conf
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
// Common configuration to be included by other impls

akka {
// FIXME library-extensions try to add a non-existent akka.stream.SystemMaterializer extension
library-extensions = ["akka.serialization.SerializationExtension"]
actor {
provider = cluster
allow-java-serialization = off
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default in Akka 2.6, so not strictly needed

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
allow-java-serialization = off

warn-about-java-serializer-usage = off
internal-dispatcher = akka.actor.default-dispatcher // FIXME consider removing this line
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvlugter @patriknw Probably best solved by a decision—given that we control all that runs in this, I thouht it important to keep number of threads down. If you both agree with this setting, then I can remove the comment.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a valid choice for Cloudstate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @patriknw!

viktorklang marked this conversation as resolved.
Show resolved Hide resolved
serializers {
crdt-serializers = "io.cloudstate.proxy.crdt.CrdtSerializers"
proto-any = "io.cloudstate.proxy.ProtobufAnySerializer"
Expand Down Expand Up @@ -39,6 +44,8 @@ akka {
artery {
enabled = on
transport = tcp
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both these are default in Akka 2.6, so not strictly needed

viktorklang marked this conversation as resolved.
Show resolved Hide resolved
// FIXME do we need to set canonical.hostname?
// FIXME do we need to set advanced.tcp.connection-timeout?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvlugter @patriknw Should we set these two new settings, and to what?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

default connection-timeout is 5s, so unless you think that is wrong you should start with defaults

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you don't set hostname it will be InetAddress.getLocalHost.getHostAddress, maybe good to have that as an optional environment variable here?

canonical.port = ${?REMOTING_PORT}
bind.port = ${?REMOTING_PORT}
}
Expand All @@ -47,16 +54,17 @@ akka {
cluster {
shutdown-after-unsuccessful-join-seed-nodes = 60s

// FIXME consider settin distributed-data.max-delta-elements = <N>
pvlugter marked this conversation as resolved.
Show resolved Hide resolved
// FIXME consider setting distributed-data.delta-crdt.max-delta-size = <N>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvlugter @patriknw These are here since they are a part of the migration docs, and I didn't want them to be forgotten, but I also don't know what to set them to. Advise?

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove, and use defaults until you see a problem.
Only mentioned in migration guide because the defaults were changed (to better defaults).


sharding.state-store-mode = ddata

// Non-durable for now, since we can't get native-image to work with lmdb right now
sharding.distributed-data.durable.keys = []

// fixme Of course, this is not ideal, but not much choice at the moment.
auto-down-unreachable-after = 30s

sharding {
rebalance-interval = 5s
passivate-idle-entity-after = off // FIXME put in a good value here
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And remove the built-in passivation using receive timeout. Looks like cloudstate.proxy.passivation-timeout is currently 30s. Could substitute that setting here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvlugter I suspect that we need to remove the receive timeout thing at the same time to not have competing timers?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, remove the receive timeout at the same time. Although, I can imagine that passivation timeouts is something that will want to be defined per entity type on the user side? Maybe this needs its own issue to be enhanced.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvlugter Makes sense to make the change separately from this PR. (Hence the FIXME)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Would be good to capture in an issue then, rather than just a FIXME.

}

// Native image doesn't support JMX
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import akka.cluster.Cluster
import akka.management.cluster.bootstrap.ClusterBootstrap
import akka.management.scaladsl.AkkaManagement
import akka.pattern.{BackoffOpts, BackoffSupervisor}
import akka.stream.ActorMaterializer
import akka.stream.SystemMaterializer
import org.slf4j.LoggerFactory
import sun.misc.Signal

Expand Down Expand Up @@ -140,7 +140,7 @@ object CloudStateProxyMain {
}

implicit val system = configuration.fold(ActorSystem("cloudstate-proxy"))(c => ActorSystem("cloudstate-proxy", c))
implicit val materializer = ActorMaterializer()
implicit val materializer = SystemMaterializer(system)
import system.dispatcher

val c = system.settings.config.getConfig("cloudstate.proxy")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import akka.cluster.singleton.{
ClusterSingletonProxySettings
}
import akka.grpc.GrpcClientSettings
import akka.stream.ActorMaterializer
import akka.stream.Materializer
import com.google.protobuf.DescriptorProtos
import com.google.protobuf.Descriptors.{FileDescriptor, ServiceDescriptor}
import com.typesafe.config.Config
Expand Down Expand Up @@ -119,7 +119,7 @@ object EntityDiscoveryManager {
}
}

def props(config: Configuration)(implicit mat: ActorMaterializer): Props =
def props(config: Configuration)(implicit mat: Materializer): Props =
Props(new EntityDiscoveryManager(config))

final case object Ready // Responds with true / false
Expand All @@ -138,7 +138,7 @@ object EntityDiscoveryManager {
}

class EntityDiscoveryManager(config: EntityDiscoveryManager.Configuration)(
implicit mat: ActorMaterializer
implicit mat: Materializer
) extends Actor
with ActorLogging {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package io.cloudstate.proxy.eventing

import akka.{Done, NotUsed}
import akka.actor.Cancellable
import akka.stream.{ActorMaterializer, FlowShape, OverflowStrategy}
import akka.stream.{FlowShape, Materializer, OverflowStrategy}
import akka.stream.scaladsl.{Flow, GraphDSL, Merge, Partition, RunnableGraph, Sink, Source}
import io.cloudstate.protocol.entity.{ClientAction, EntityDiscoveryClient, Failure, Reply, UserFunctionError}
import io.cloudstate.proxy.{Serve, UserFunctionRouter}
Expand Down Expand Up @@ -71,7 +71,7 @@ object EventingManager {
else List(EventMapping(entity, endpoints))
}

def createSupport(eventConfig: Config)(implicit materializer: ActorMaterializer): Option[EventingSupport] =
def createSupport(eventConfig: Config)(implicit materializer: Materializer): Option[EventingSupport] =
eventConfig.getString("support") match {
case "none" =>
log.info("Eventing support turned off in configuration")
Expand Down
Loading