Skip to content
This repository has been archived by the owner on Mar 16, 2022. It is now read-only.

Passivation configured in language supports via the discovery protocol #486

Merged
merged 17 commits into from
Dec 17, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,15 @@ public CloudState registerEventSourcedEntity(

final String persistenceId;
final int snapshotEvery;
final int passivationTimeout;
if (entity.persistenceId().isEmpty()) {
persistenceId = entityClass.getSimpleName();
snapshotEvery = 0; // Default
passivationTimeout = 0; // Default
} else {
persistenceId = entity.persistenceId();
snapshotEvery = entity.snapshotEvery();
passivationTimeout = entity.passivationTimeout();
}

final AnySupport anySupport = newAnySupport(additionalDescriptors);
Expand All @@ -141,7 +144,8 @@ public CloudState registerEventSourcedEntity(
descriptor,
anySupport,
persistenceId,
snapshotEvery);
snapshotEvery,
passivationTimeout);

services.put(descriptor.getFullName(), system -> service);

Expand Down Expand Up @@ -169,6 +173,7 @@ public CloudState registerEventSourcedEntity(
Descriptors.ServiceDescriptor descriptor,
String persistenceId,
int snapshotEvery,
int passivationTimeout,
Descriptors.FileDescriptor... additionalDescriptors) {
services.put(
descriptor.getFullName(),
Expand All @@ -178,7 +183,8 @@ public CloudState registerEventSourcedEntity(
descriptor,
newAnySupport(additionalDescriptors),
persistenceId,
snapshotEvery));
snapshotEvery,
passivationTimeout));

return this;
}
Expand All @@ -205,13 +211,21 @@ public CloudState registerCrdtEntity(
entityClass + " does not declare an " + CrdtEntity.class + " annotation!");
}

final int passivationTimeout;
if (entity.passivationTimeout() == 0) {
passivationTimeout = 0; // Default
} else {
passivationTimeout = entity.passivationTimeout();
}

final AnySupport anySupport = newAnySupport(additionalDescriptors);

CrdtStatefulService service =
new CrdtStatefulService(
new AnnotationBasedCrdtSupport(entityClass, anySupport, descriptor),
descriptor,
anySupport);
anySupport,
passivationTimeout);

services.put(descriptor.getFullName(), system -> service);

Expand All @@ -233,11 +247,13 @@ public CloudState registerCrdtEntity(
public CloudState registerCrdtEntity(
CrdtEntityFactory factory,
Descriptors.ServiceDescriptor descriptor,
int passivationTimeout,
Descriptors.FileDescriptor... additionalDescriptors) {
services.put(
descriptor.getFullName(),
system ->
new CrdtStatefulService(factory, descriptor, newAnySupport(additionalDescriptors)));
new CrdtStatefulService(
factory, descriptor, newAnySupport(additionalDescriptors), passivationTimeout));

return this;
}
Expand Down Expand Up @@ -327,10 +343,13 @@ public CloudState registerEntity(
}

final String persistenceId;
final int passivationTimeout;
if (entity.persistenceId().isEmpty()) {
persistenceId = entityClass.getSimpleName();
passivationTimeout = 0; // Default
} else {
persistenceId = entity.persistenceId();
passivationTimeout = entity.passivationTimeout();
}

final AnySupport anySupport = newAnySupport(additionalDescriptors);
Expand All @@ -339,7 +358,8 @@ public CloudState registerEntity(
new AnnotationBasedEntitySupport(entityClass, anySupport, descriptor),
descriptor,
anySupport,
persistenceId);
persistenceId,
passivationTimeout);

services.put(descriptor.getFullName(), system -> service);

Expand All @@ -363,12 +383,17 @@ public CloudState registerEntity(
EntityFactory factory,
Descriptors.ServiceDescriptor descriptor,
String persistenceId,
int passivationTimeout,
Descriptors.FileDescriptor... additionalDescriptors) {
services.put(
descriptor.getFullName(),
system ->
new ValueEntityStatefulService(
factory, descriptor, newAnySupport(additionalDescriptors), persistenceId));
factory,
descriptor,
newAnySupport(additionalDescriptors),
persistenceId,
passivationTimeout));

return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,12 @@
@CloudStateAnnotation
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface CrdtEntity {}
public @interface CrdtEntity {

/**
* Specifies the idle time of the entity before it is passivated. Zero and any negative value mean
* use default from configuration file. Any positive value means the entity is passivated after
* the idle time.
*/
int passivationTimeout() default 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello Guy I didn't like this option for me it should be done at the Entity registration stage and not at the entity itself.

Copy link
Contributor Author

@ralphlaude ralphlaude Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi Adriano, it is also fine for me and it is easier. Why do you like the option over the registration?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great! Thanks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you like this option?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would 0 not be a valid value? And therefore passivation disabled at all?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and it only sends a timeout if it's been configured in the user function, rather than relying on negative numbers. And the protocol could have a duration message, with time units, to make things complete.

I agree to change the protocol now and a soon as possible for such changes. I also agree on not depending on magic values.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'd like to have options for expression the following things - perhaps not immediately, but we should choose a mechanism and protocol that allows us to add support for them in future if needed:

  • A specific passivation timeout, with at least millisecond resolution.
  • Default - note that I think it's fine for the default to come from the support library, I don't see any reason why the proxy needs to be in control of the default.
  • No passivation timeout - entities should never passivate unless rebalanced.
  • Immediate passivation, this effectively means the state is reloaded for each command.
  • Some way to indicate the proxy should apply smarter, more dynamic passivation strategies, for example strategies that respond to resource constraints, or perhaps auto-tune themselves based on cardinality and/or hot spot metrics.

So, basically, I think it's worth now choosing a strategy in the APIs and protocol that will allow us to add some of the features above without breaking the protocol or APIs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea to future-proof the protocol and APIs now for other passivation options.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really good idea for passivation options.
More generally we should have a strategy for adding other future options (not only passivation) without breaking the protocol or the APIs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some more discussion:

  • passivation timeout in protocol can just be milliseconds, we don't expect more fine-grained than that
  • language supports can use duration APIs or similar as appropriate, convert to milliseconds
  • protocol should support expanding to other passivation strategies, where initially it could just be timeout, ideally we can add never or immediate or other strategies in a protocol-compatible way
  • protocol should support strategies explicitly, such as timeout, never or immediate, and not use magic values

}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,11 @@
* that you specify it explicitly.
*/
String persistenceId() default "";

/**
* Specifies the idle time of the entity before it is passivated. Zero and any negative value mean
* use default from configuration file. Any positive value means the entity is passivated after
* the idle time.
*/
int passivationTimeout() default 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
/**
* The name of the persistence id.
*
* <p>If not specifed, defaults to the entities unqualified classname. It's strongly recommended
* <p>If not specified, defaults to the entities unqualified classname. It's strongly recommended
* that you specify it explicitly.
*/
String persistenceId() default "";
Expand All @@ -42,4 +42,11 @@
* snapshot at-or-after that number of events.
*/
int snapshotEvery() default 0;

/**
* Specifies the idle time of the entity before it is passivated. Zero and any negative value mean
* use default from configuration file. Any positive value means the entity is passivated after
* the idle time.
*/
int passivationTimeout() default 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,7 @@ final class CloudStateRunner private[this] (
}

/**
* StatefulService describes an entitiy type in a way which makes it possible
* to deploy.
* Service describes an entity type in a way which makes it possible to deploy.
*/
trait Service {

Expand All @@ -174,15 +173,24 @@ trait Service {

/**
* Possible values are: "", "", "".
* @return the type of entity represented by this StatefulService
* @return the type of entity represented by this service
*/
def entityType: String

/**
* @return the persistence identifier used for the the entities represented by this service
* @return the persistence identifier used for the entities represented by this service
*/
def persistenceId: String = descriptor.getName

// TODO JavaDoc
/**
* @return the idle time after which entities represented by this service should be passivated
*/
def passivationTimeout: Int

/**
* @return a dictionary of service methods (Protobuf Descriptors.MethodDescriptor) classified by method name.
* The dictionary values represent a mapping of Protobuf Descriptors.MethodDescriptor with its input
* and output types (see [[io.cloudstate.javasupport.impl.ResolvedServiceMethod]])
*/
def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class EntityDiscoveryImpl(system: ActorSystem, services: Map[String, Service]) e

val entities = services.map {
case (name, service) =>
Entity(service.entityType, name, service.persistenceId)
Entity(service.entityType, name, service.persistenceId, service.passivationTimeout)
}.toSeq

Future.successful(EntitySpec(fileDescriptorSet, entities, Some(serviceInfo)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import io.cloudstate.protocol.entity.{Failure, Forward, Reply, SideEffect, Metad

import scala.concurrent.Future
import scala.compat.java8.FutureConverters._
import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

final class ActionService(val actionHandler: ActionHandler,
override val descriptor: Descriptors.ServiceDescriptor,
Expand All @@ -45,6 +45,8 @@ final class ActionService(val actionHandler: ActionHandler,
}

override final val entityType = ActionProtocol.name

override final val passivationTimeout = 0
}

final class ActionProtocolImpl(_system: ActorSystem, services: Map[String, ActionService], rootContext: Context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,12 @@ import io.cloudstate.protocol.entity.{Command, Failure, StreamCancelled}
import com.google.protobuf.any.{Any => ScalaPbAny}
import com.google.protobuf.{Any => JavaPbAny}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

final class CrdtStatefulService(val factory: CrdtEntityFactory,
override val descriptor: Descriptors.ServiceDescriptor,
val anySupport: AnySupport)
val anySupport: AnySupport,
override val passivationTimeout: Int)
extends Service {
override final val entityType = Crdt.name

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ import scala.util.control.NonFatal
final class ValueEntityStatefulService(val factory: EntityFactory,
override val descriptor: Descriptors.ServiceDescriptor,
val anySupport: AnySupport,
override val persistenceId: String)
override val persistenceId: String,
override val passivationTimeout: Int)
extends Service {

override def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ final class EventSourcedStatefulService(val factory: EventSourcedEntityFactory,
override val descriptor: Descriptors.ServiceDescriptor,
val anySupport: AnySupport,
override val persistenceId: String,
val snapshotEvery: Int)
val snapshotEvery: Int,
override val passivationTimeout: Int)
extends Service {

override def resolvedMethods: Option[Map[String, ResolvedServiceMethod[_, _]]] =
Expand All @@ -64,7 +65,12 @@ final class EventSourcedStatefulService(val factory: EventSourcedEntityFactory,
override final val entityType = EventSourced.name
final def withSnapshotEvery(snapshotEvery: Int): EventSourcedStatefulService =
if (snapshotEvery != this.snapshotEvery)
new EventSourcedStatefulService(this.factory, this.descriptor, this.anySupport, this.persistenceId, snapshotEvery)
new EventSourcedStatefulService(this.factory,
this.descriptor,
this.anySupport,
this.persistenceId,
snapshotEvery,
this.passivationTimeout)
else
this
}
Expand Down
3 changes: 3 additions & 0 deletions protocols/protocol/cloudstate/entity.proto
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ message Entity {
// The ID to namespace state by. How this is used depends on the type of entity, for example,
// event sourced entities will prefix this to the persistence id.
string persistence_id = 3;

// The idle timeout, in seconds, after which an entity is passivated.
int32 passivation_timeout = 4;
}

message UserFunctionError {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package io.cloudstate.proxy.crdt

import java.net.URLEncoder
import java.nio.charset.StandardCharsets.UTF_8
import java.util.concurrent.TimeUnit

import akka.{Done, NotUsed}
import akka.actor.{ActorRef, ActorSystem, CoordinatedShutdown}
Expand All @@ -34,7 +35,6 @@ import io.cloudstate.protocol.entity.{Entity, EntityDiscovery, Metadata}
import io.cloudstate.proxy._
import io.cloudstate.proxy.entity.{EntityCommand, UserFunctionReply}

import scala.collection.JavaConverters._
import scala.concurrent.{ExecutionContext, Future}
import scala.concurrent.duration._

Expand All @@ -54,12 +54,7 @@ class CrdtSupportFactory(system: ActorSystem,

validate(serviceDescriptor, methodDescriptors)

val crdtEntityConfig = CrdtEntity.Configuration(entity.serviceName,
entity.persistenceId,
config.crdtSettings.passivationTimeout,
config.relayOutputBufferSize,
3.seconds,
5.seconds)
val crdtEntityConfig = crdtEntityConfiguration(config, entity)

log.debug("Starting CrdtEntity for {}", entity.serviceName)

Expand Down Expand Up @@ -99,6 +94,24 @@ class CrdtSupportFactory(system: ActorSystem,
)
}
}

private def crdtEntityConfiguration(config: EntityDiscoveryManager.Configuration,
entity: Entity): CrdtEntity.Configuration = {
val passivationTimeout = if (entity.passivationTimeout > 0) {
Timeout(Duration(entity.passivationTimeout, TimeUnit.SECONDS).toMillis.millis)
} else {
config.crdtSettings.passivationTimeout
}

CrdtEntity.Configuration(
entity.serviceName,
entity.persistenceId,
passivationTimeout,
config.relayOutputBufferSize,
3.seconds, // TODO make it configurable
5.seconds // TODO make it configurable
)
}
}

private class CrdtSupport(crdtEntity: ActorRef, parallelism: Int, private implicit val relayTimeout: Timeout)
Expand Down
Loading