Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Passivation configured in language supports via the discovery protocol #486

Merged
merged 17 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ public interface PassivationStrategy {
* @return the passivation strategy
*/
static PassivationStrategy defaultTimeout() {
return Timeout.apply();
return new Timeout(CloudStateConfigHolder.defaultPassivationTimeout());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright 2019 Lightbend Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.cloudstate.javasupport

import java.time.Duration

import com.typesafe.config.{Config, ConfigFactory}

/**
* The CloudStateConfigHolder is responsible for loading and holding the default configuration.
* This configuration is used by [[io.cloudstate.javasupport.CloudState]] and
* [[io.cloudstate.javasupport.PassivationStrategy#defaultTimeout()]].
*/
private[javasupport] object CloudStateConfigHolder {

private val configuration: Config = {
val conf = ConfigFactory.load()
conf.getConfig("cloudstate.system").withFallback(conf)
}

def defaultConfiguration(): Config = configuration

def defaultPassivationTimeout(): Duration = configuration.getDuration("cloudstate.passivation-timeout")
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package io.cloudstate.javasupport

import java.util.concurrent.CompletionStage

import com.typesafe.config.{Config, ConfigFactory}
import com.typesafe.config.Config
import akka.Done
import akka.actor.ActorSystem
import akka.http.scaladsl._
Expand Down Expand Up @@ -79,15 +79,16 @@ final class CloudStateRunner private[this] (
case (serviceName, factory) => serviceName -> factory(system)
}.toMap

// TODO JavaDoc
/**
* Creates a CloudStateRunner from the given services. Use the default config to create the internal ActorSystem.
*/
def this(services: java.util.Map[String, java.util.function.Function[ActorSystem, Service]]) {
this(ActorSystem("StatefulService", {
val conf = ConfigFactory.load()
conf.getConfig("cloudstate.system").withFallback(conf)
}), services.asScala.toMap)
this(ActorSystem("StatefulService", CloudStateConfigHolder.defaultConfiguration()), services.asScala.toMap)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's the other constructor, that takes a config explicitly, that should be supported. The config gets passed to the actor system. It should also be the config that gets passed through to other places (could be retrieved from the actor system). Might be more obvious if there's a test that sets the passivation timeout in config?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvlugter I hope my solution does the job and I did not find a better way.

}

// TODO JavaDoc
/**
* Creates a CloudStateRunner from the given services and config. Use the config to create the internal ActorSystem.
*/
def this(services: java.util.Map[String, java.util.function.Function[ActorSystem, Service]], config: Config) {
this(ActorSystem("StatefulService", config), services.asScala.toMap)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,6 @@ package io.cloudstate.javasupport.impl

import java.time.Duration

import com.typesafe.config.ConfigFactory
import io.cloudstate.javasupport.PassivationStrategy

private[impl] case class Timeout(duration: Duration) extends PassivationStrategy

private[impl] object Timeout {

private val defaultPassivationTimeout: Duration = {
val config = ConfigFactory.load()
config.getDuration("cloudstate.passivation-timeout")
}

def apply(): Timeout = Timeout(defaultPassivationTimeout)
}
4 changes: 2 additions & 2 deletions protocols/protocol/cloudstate/entity.proto
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ message Entity {
// The semantics is to provide a flexible way for entity user functions to configure the passivation strategy.
// This strategy is sent to the proxy at discovery time allowing the proxy to configure the corresponding entities.
// The only passivation strategy supported is the timeout strategy and configuring this is optional for the entity.
// If an entity user function does not configure the passivation strategy the proxy used its fallback default value.
// If an entity user function does not configure the passivation strategy the proxy uses its fallback default value.
//
// The passivation strategy for the entity user function.
message EntityPassivationStrategy {
Expand All @@ -281,7 +281,7 @@ message EntityPassivationStrategy {
}
}

// A passivation strategy based on a timeout. The idle timeout after which an user function's entity is passivated.
// A passivation strategy based on a timeout. The idle timeout after which a user function's entity is passivated.
message TimeoutPassivationStrategy {
// The timeout in millis
int64 timeout = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,9 @@ class CrdtSupportFactory(system: ActorSystem,
val methodsWithoutKeys = methodDescriptors.values.filter(_.keyFieldsCount < 1)
if (methodsWithoutKeys.nonEmpty) {
val offendingMethods = methodsWithoutKeys.map(_.method.getName).mkString(",")
throw EntityDiscoveryException(
s"""CRDT entities do not support methods whose parameters do not have at least one field marked as entity_key,
|but ${serviceDescriptor.getFullName} has the following methods without keys: ${offendingMethods}""".stripMargin
.replaceAll("\n", " ")
throw new EntityDiscoveryException(
"CRDT entities do not support methods whose parameters do not have at least one field marked as entity_key, " +
s"but ${serviceDescriptor.getFullName} has the following methods without keys: ${offendingMethods}"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,9 @@ class EventSourcedSupportFactory(
val methodsWithoutKeys = methodDescriptors.values.filter(_.keyFieldsCount < 1)
if (methodsWithoutKeys.nonEmpty) {
val offendingMethods = methodsWithoutKeys.map(_.method.getName).mkString(",")
throw EntityDiscoveryException(
s"""Event sourced entities do not support methods whose parameters do not have at least one field marked as entity_key,
|but ${serviceDescriptor.getFullName} has the following methods without keys: ${offendingMethods}""".stripMargin
.replaceAll("\n", " ")
throw new EntityDiscoveryException(
"Event sourced entities do not support methods whose parameters do not have at least one field marked as entity_key, " +
s"but ${serviceDescriptor.getFullName} has the following methods without keys: ${offendingMethods}"
)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,9 +101,8 @@ class EntitySupportFactory(
if (methodsWithoutKeys.nonEmpty) {
val offendingMethods = methodsWithoutKeys.map(_.method.getName).mkString(",")
throw EntityDiscoveryException(
s"""Value based entities do not support methods whose parameters do not have at least one field marked as entity_key,
|but ${serviceDescriptor.getFullName} has the following methods without keys: $offendingMethods""".stripMargin
.replaceAll("\n", " ")
"Value based entities do not support methods whose parameters do not have at least one field marked as entity_key, " +
s"but ${serviceDescriptor.getFullName} has the following methods without keys: $offendingMethods"
)
}
}
Expand Down
19 changes: 18 additions & 1 deletion tck/src/main/scala/io/cloudstate/tck/CloudStateTCK.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import com.google.protobuf.any.{Any => ScalaPbAny}
import com.typesafe.config.{Config, ConfigFactory}
import io.cloudstate.protocol.action._
import io.cloudstate.protocol.crdt.Crdt
import io.cloudstate.protocol.entity.EntityPassivationStrategy.Strategy
import io.cloudstate.protocol.entity.{EntityPassivationStrategy, TimeoutPassivationStrategy}
import io.cloudstate.protocol.value_entity.ValueEntity
import io.cloudstate.protocol.event_sourced._
import io.cloudstate.tck.model.valueentity.valueentity.{ValueEntityTckModel, ValueEntityTwo}
Expand All @@ -43,6 +45,7 @@ import io.grpc.StatusRuntimeException
import io.cloudstate.tck.model.eventsourced.{EventSourcedTckModel, EventSourcedTwo}
import io.cloudstate.testkit.valueentity.ValueEntityMessages
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.matchers.Matcher
import org.scalatest.{BeforeAndAfterAll, MustMatchers, WordSpec}

import scala.concurrent.duration._
Expand Down Expand Up @@ -109,6 +112,13 @@ class CloudStateTCK(description: String, settings: CloudStateTCK.Settings)
"verify proxy info and entity discovery" in {
import scala.jdk.CollectionConverters._

def verifyTimeoutPassivationStrategy(strategy: Option[EntityPassivationStrategy]): Unit =
strategy must (be(
Some(
EntityPassivationStrategy(Strategy.Timeout(TimeoutPassivationStrategy(30.seconds.toMillis)))
)
) or be(None))

expectProxyOnline()

val discovery = interceptor.expectEntityDiscovery()
Expand All @@ -135,54 +145,61 @@ class CloudStateTCK(description: String, settings: CloudStateTCK.Settings)
spec.entities.find(_.serviceName == ActionTckModel.name).foreach { entity =>
serviceNames must contain("ActionTckModel")
entity.entityType mustBe ActionProtocol.name
entity.passivationStrategy mustBe None
}

spec.entities.find(_.serviceName == ActionTwo.name).foreach { entity =>
serviceNames must contain("ActionTwo")
entity.entityType mustBe ActionProtocol.name
entity.passivationStrategy mustBe None
}

spec.entities.find(_.serviceName == EventSourcedTckModel.name).foreach { entity =>
serviceNames must contain("EventSourcedTckModel")
entity.entityType mustBe EventSourced.name
entity.persistenceId mustBe "event-sourced-tck-model"
verifyTimeoutPassivationStrategy(entity.passivationStrategy)
}

spec.entities.find(_.serviceName == EventSourcedTwo.name).foreach { entity =>
serviceNames must contain("EventSourcedTwo")
entity.entityType mustBe EventSourced.name
verifyTimeoutPassivationStrategy(entity.passivationStrategy)
}

spec.entities.find(_.serviceName == EventSourcedShoppingCart.name).foreach { entity =>
serviceNames must contain("ShoppingCart")
entity.entityType mustBe EventSourced.name
entity.persistenceId must not be empty
verifyTimeoutPassivationStrategy(entity.passivationStrategy)
}

spec.entities.find(_.serviceName == ValueEntityTckModel.name).foreach { entity =>
serviceNames must contain("ValueEntityTckModel")
entity.entityType mustBe ValueEntity.name
entity.persistenceId mustBe "value-entity-tck-model"
verifyTimeoutPassivationStrategy(entity.passivationStrategy)
}

spec.entities.find(_.serviceName == ValueEntityTwo.name).foreach { entity =>
serviceNames must contain("ValueEntityTwo")
entity.entityType mustBe ValueEntity.name
entity.persistenceId mustBe "value-entity-tck-model-two"
verifyTimeoutPassivationStrategy(entity.passivationStrategy)
}

spec.entities.find(_.serviceName == ValueEntityShoppingCart.name).foreach { entity =>
serviceNames must contain("ShoppingCart")
entity.entityType mustBe ValueEntity.name
entity.persistenceId must not be empty
verifyTimeoutPassivationStrategy(entity.passivationStrategy)
}

enabledServices = spec.entities.map(_.serviceName)
}
}

"verifying model test: actions" must {
import io.cloudstate.protocol.entity
import io.cloudstate.tck.model.action._
import io.cloudstate.testkit.action.ActionMessages._

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package io.cloudstate.testkit.discovery

import java.util.concurrent.TimeUnit

import akka.grpc.ServiceDescription
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.testkit.TestProbe
Expand Down Expand Up @@ -45,6 +43,7 @@ final class TestEntityDiscoveryService(context: TestServiceContext) {

object TestEntityDiscoveryService {
val info: ServiceInfo = ServiceInfo(supportLibraryName = "Cloudstate TestKit")

private val passivationStrategyTimeout: Strategy.Timeout =
Strategy.Timeout(TimeoutPassivationStrategy(30.seconds.toMillis))

Expand Down