Skip to content

Commit

Permalink
Typed Command Gateway can now be created multiple times per type of a…
Browse files Browse the repository at this point in the history
…ggregate root (#64)
  • Loading branch information
olger authored Mar 9, 2021
1 parent 716af39 commit f97c9f0
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ class DefaultTypedCommandGateway[Cmd <: DomainCommand](
}
}

val gateway = system.systemActorOf(CommandGatewayGuardian(), "typedcommandgateway")
val gateway = system.systemActorOf(CommandGatewayGuardian(), "typedcommandgateway-" + aggregateRootCreator.getClass.getSimpleName)

override def ask[Res](aggregateRootId: String, replyTo: ActorRef[Res] => Cmd)(
implicit validator: ValidateableCommand[Cmd]
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (C) 2016-2021 Cafienne B.V. <https://www.cafienne.io/bounded>
*/

package io.cafienne.bounded.aggregate

import akka.actor.typed.Behavior
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior, ReplyEffect}
import com.typesafe.scalalogging.Logger
import io.cafienne.bounded.aggregate.typed.TypedAggregateRootManager
import scala.concurrent.duration._
import akka.actor.typed.scaladsl.{Behaviors, TimerScheduler}
import akka.persistence.RecoveryCompleted

//This another aggregate is for testing the use of command gateways with multiple aggregates.
object TypedAnotherAggregate {
val aggregateRootTag = "ar-another"
import TypedSimpleAggregate._

var replayed = false

final val logger = Logger(TypedSimpleAggregate.getClass.getSimpleName)


// Command handler logic
def commandHandler(
timers: TimerScheduler[SimpleAggregateCommand]
): (SimpleAggregateState, SimpleAggregateCommand) => ReplyEffect[SimpleAggregateEvent, SimpleAggregateState] = {
(state, command) =>
logger.debug("Received command: " + command)
command match {
case cmd: Create => createAggregate(cmd)
case cmd: AddItem => addItem(cmd)
case cmd: Stop =>
logger.debug("Stopping Aggregate {}", cmd.aggregateRootId)
Effect.stop().thenReply(cmd.replyTo)(_ => OK)
case cmd: StopAfter =>
timers.startSingleTimer(
InternalStop(aggregateRootId = cmd.aggregateRootId, metaData = cmd.metaData),
2.seconds
)
Effect.none.thenReply(cmd.replyTo)(_ => OK)
case cmd: InternalStop =>
logger.debug("InternalStop for aggregate {}", cmd.aggregateRootId)
Effect.stop().thenNoReply()
}
}

private def createAggregate(cmd: Create): ReplyEffect[SimpleAggregateEvent, SimpleAggregateState] = {
logger.debug(s"Create Aggregate (replayed: $replayed)" + cmd)
Effect.persist(Created(cmd.aggregateRootId, TestMetaData.fromCommand(cmd.metaData))).thenReply(cmd.replyTo)(_ OK)
}

private def addItem(cmd: AddItem): ReplyEffect[SimpleAggregateEvent, SimpleAggregateState] = {
logger.debug("AddItem " + cmd)
Effect
.persist(ItemAdded(cmd.aggregateRootId, TestMetaData.fromCommand(cmd.metaData), cmd.item))
.thenReply(cmd.replyTo)(_ OK)
}

// event handler to keep internal aggregate state
val eventHandler: (SimpleAggregateState, SimpleAggregateEvent) => SimpleAggregateState = { (state, event) =>
logger.debug("updating state with {}", event)
state.update(event)
}

}

import TypedSimpleAggregate._
class AnotherAggregateManager() extends TypedAggregateRootManager[SimpleAggregateCommand] {
import TypedSimpleAggregate._
final val logger = Logger(this.getClass.getSimpleName)

override def behavior(id: String): Behavior[SimpleAggregateCommand] = {
logger.debug("Create aggregate behavior for {}", id)
Behaviors.withTimers { timers
EventSourcedBehavior
.withEnforcedReplies(
PersistenceId(aggregateRootTag, id),
SimpleAggregateState(List.empty[String]),
commandHandler(timers),
eventHandler
)
.receiveSignal {
case (state, b: RecoveryCompleted) => {
logger.debug("Received RecoveryCompleted for actor with state {}", state)
replayed = true
}
}
}
}

override def entityTypeKey: EntityTypeKey[SimpleAggregateCommand] =
EntityTypeKey[SimpleAggregateCommand](aggregateRootTag)
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,13 @@ class TypedCommandGatewaySpec
}
}

"Command Gateway" should "be able to be created multiple times to route to other aggregates" in {
val creator = new AnotherAggregateManager()
val anotherTypedCommandGateway = new DefaultTypedCommandGateway[SimpleAggregateCommand](system, creator, 6.seconds)

assert(anotherTypedCommandGateway != null)
}

protected override def afterAll(): Unit = {
Await.ready(typedCommandGateway.shutdown(), 5.seconds).map(_ => (): Unit)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ object TypedSimpleAggregate {
final case class Stop(aggregateRootId: String, metaData: CommandMetaData, replyTo: ActorRef[Response])
extends SimpleAggregateCommand

private final case class InternalStop(aggregateRootId: String, metaData: CommandMetaData)
final case class InternalStop(aggregateRootId: String, metaData: CommandMetaData)
extends SimpleAggregateCommand

final case class StopAfter(
Expand Down
8 changes: 4 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@

lazy val basicSettings = {
val scala213 = "2.13.4"
val scala213 = "2.13.5"
val scala212 = "2.12.13"
val supportedScalaVersions = List(scala213, scala212)

Seq(
organization := "io.cafienne",
organization := "io.cafienne.bounded",
description := "Scala and Akka based Domain Driven Design Framework",
scalaVersion := scala213,
crossScalaVersions := supportedScalaVersions,
releaseCrossBuild := true,
//crossScalaVersions := supportedScalaVersions,
//releaseCrossBuild := false,
scalacOptions := Seq(
"-encoding", "UTF-8",
//"-target:jvm-1.8",
Expand Down
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

version in ThisBuild := "0.3.1"
version in ThisBuild := "0.3.2"

0 comments on commit f97c9f0

Please sign in to comment.