diff --git a/bounded-core/src/main/scala/io/cafienne/bounded/aggregate/typed/DefaultTypedCommandGateway.scala b/bounded-core/src/main/scala/io/cafienne/bounded/aggregate/typed/DefaultTypedCommandGateway.scala index f99d511..c0635b8 100644 --- a/bounded-core/src/main/scala/io/cafienne/bounded/aggregate/typed/DefaultTypedCommandGateway.scala +++ b/bounded-core/src/main/scala/io/cafienne/bounded/aggregate/typed/DefaultTypedCommandGateway.scala @@ -100,7 +100,7 @@ class DefaultTypedCommandGateway[Cmd <: DomainCommand]( } } - val gateway = system.systemActorOf(CommandGatewayGuardian(), "typedcommandgateway") + val gateway = system.systemActorOf(CommandGatewayGuardian(), "typedcommandgateway-" + aggregateRootCreator.getClass.getSimpleName) override def ask[Res](aggregateRootId: String, replyTo: ActorRef[Res] => Cmd)( implicit validator: ValidateableCommand[Cmd] diff --git a/bounded-core/src/test/scala/io/cafienne/bounded/aggregate/TypedAnotherAggregate.scala b/bounded-core/src/test/scala/io/cafienne/bounded/aggregate/TypedAnotherAggregate.scala new file mode 100644 index 0000000..170edf7 --- /dev/null +++ b/bounded-core/src/test/scala/io/cafienne/bounded/aggregate/TypedAnotherAggregate.scala @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2016-2021 Cafienne B.V. + */ + +package io.cafienne.bounded.aggregate + +import akka.actor.typed.Behavior +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.persistence.typed.PersistenceId +import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior, ReplyEffect} +import com.typesafe.scalalogging.Logger +import io.cafienne.bounded.aggregate.typed.TypedAggregateRootManager +import scala.concurrent.duration._ +import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler} +import akka.persistence.RecoveryCompleted + +//This another aggregate is for testing the use of command gateways with multiple aggregates. +object TypedAnotherAggregate { + val aggregateRootTag = "ar-another" + import TypedSimpleAggregate._ + + var replayed = false + + final val logger = Logger(TypedSimpleAggregate.getClass.getSimpleName) + + + // Command handler logic + def commandHandler( + timers: TimerScheduler[SimpleAggregateCommand] + ): (SimpleAggregateState, SimpleAggregateCommand) => ReplyEffect[SimpleAggregateEvent, SimpleAggregateState] = { + (state, command) => + logger.debug("Received command: " + command) + command match { + case cmd: Create => createAggregate(cmd) + case cmd: AddItem => addItem(cmd) + case cmd: Stop => + logger.debug("Stopping Aggregate {}", cmd.aggregateRootId) + Effect.stop().thenReply(cmd.replyTo)(_ => OK) + case cmd: StopAfter => + timers.startSingleTimer( + InternalStop(aggregateRootId = cmd.aggregateRootId, metaData = cmd.metaData), + 2.seconds + ) + Effect.none.thenReply(cmd.replyTo)(_ => OK) + case cmd: InternalStop => + logger.debug("InternalStop for aggregate {}", cmd.aggregateRootId) + Effect.stop().thenNoReply() + } + } + + private def createAggregate(cmd: Create): ReplyEffect[SimpleAggregateEvent, SimpleAggregateState] = { + logger.debug(s"Create Aggregate (replayed: $replayed)" + cmd) + Effect.persist(Created(cmd.aggregateRootId, TestMetaData.fromCommand(cmd.metaData))).thenReply(cmd.replyTo)(_ ⇒ OK) + } + + private def addItem(cmd: AddItem): ReplyEffect[SimpleAggregateEvent, SimpleAggregateState] = { + logger.debug("AddItem " + cmd) + Effect + .persist(ItemAdded(cmd.aggregateRootId, TestMetaData.fromCommand(cmd.metaData), cmd.item)) + .thenReply(cmd.replyTo)(_ ⇒ OK) + } + + // event handler to keep internal aggregate state + val eventHandler: (SimpleAggregateState, SimpleAggregateEvent) => SimpleAggregateState = { (state, event) => + logger.debug("updating state with {}", event) + state.update(event) + } + +} + +import TypedSimpleAggregate._ +class AnotherAggregateManager() extends TypedAggregateRootManager[SimpleAggregateCommand] { + import TypedSimpleAggregate._ + final val logger = Logger(this.getClass.getSimpleName) + + override def behavior(id: String): Behavior[SimpleAggregateCommand] = { + logger.debug("Create aggregate behavior for {}", id) + Behaviors.withTimers { timers ⇒ + EventSourcedBehavior + .withEnforcedReplies( + PersistenceId(aggregateRootTag, id), + SimpleAggregateState(List.empty[String]), + commandHandler(timers), + eventHandler + ) + .receiveSignal { + case (state, b: RecoveryCompleted) => { + logger.debug("Received RecoveryCompleted for actor with state {}", state) + replayed = true + } + } + } + } + + override def entityTypeKey: EntityTypeKey[SimpleAggregateCommand] = + EntityTypeKey[SimpleAggregateCommand](aggregateRootTag) +} diff --git a/bounded-core/src/test/scala/io/cafienne/bounded/aggregate/TypedCommandGatewaySpec.scala b/bounded-core/src/test/scala/io/cafienne/bounded/aggregate/TypedCommandGatewaySpec.scala index e4f8ce9..f921d7f 100644 --- a/bounded-core/src/test/scala/io/cafienne/bounded/aggregate/TypedCommandGatewaySpec.scala +++ b/bounded-core/src/test/scala/io/cafienne/bounded/aggregate/TypedCommandGatewaySpec.scala @@ -151,6 +151,13 @@ class TypedCommandGatewaySpec } } + "Command Gateway" should "be able to be created multiple times to route to other aggregates" in { + val creator = new AnotherAggregateManager() + val anotherTypedCommandGateway = new DefaultTypedCommandGateway[SimpleAggregateCommand](system, creator, 6.seconds) + + assert(anotherTypedCommandGateway != null) + } + protected override def afterAll(): Unit = { Await.ready(typedCommandGateway.shutdown(), 5.seconds).map(_ => (): Unit) } diff --git a/bounded-core/src/test/scala/io/cafienne/bounded/aggregate/TypedSimpleAggregate.scala b/bounded-core/src/test/scala/io/cafienne/bounded/aggregate/TypedSimpleAggregate.scala index b737416..51aa7cb 100644 --- a/bounded-core/src/test/scala/io/cafienne/bounded/aggregate/TypedSimpleAggregate.scala +++ b/bounded-core/src/test/scala/io/cafienne/bounded/aggregate/TypedSimpleAggregate.scala @@ -48,7 +48,7 @@ object TypedSimpleAggregate { final case class Stop(aggregateRootId: String, metaData: CommandMetaData, replyTo: ActorRef[Response]) extends SimpleAggregateCommand - private final case class InternalStop(aggregateRootId: String, metaData: CommandMetaData) + final case class InternalStop(aggregateRootId: String, metaData: CommandMetaData) extends SimpleAggregateCommand final case class StopAfter( diff --git a/build.sbt b/build.sbt index f230a67..890795a 100644 --- a/build.sbt +++ b/build.sbt @@ -1,15 +1,15 @@ lazy val basicSettings = { - val scala213 = "2.13.4" + val scala213 = "2.13.5" val scala212 = "2.12.13" val supportedScalaVersions = List(scala213, scala212) Seq( - organization := "io.cafienne", + organization := "io.cafienne.bounded", description := "Scala and Akka based Domain Driven Design Framework", scalaVersion := scala213, - crossScalaVersions := supportedScalaVersions, - releaseCrossBuild := true, + //crossScalaVersions := supportedScalaVersions, + //releaseCrossBuild := false, scalacOptions := Seq( "-encoding", "UTF-8", //"-target:jvm-1.8", diff --git a/version.sbt b/version.sbt index b01c8c7..d08e1fa 100644 --- a/version.sbt +++ b/version.sbt @@ -1,2 +1,2 @@ -version in ThisBuild := "0.3.1" +version in ThisBuild := "0.3.2"