Skip to content

Commit

Permalink
Fix on Projection testing (#69)
Browse files Browse the repository at this point in the history
* Changed the TestableProjection to wait for the last message in another way
* Ready for release 0.3.5
  • Loading branch information
olger authored Sep 17, 2021
1 parent f8f00c8 commit 80b3223
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 33 deletions.
25 changes: 13 additions & 12 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,9 @@ jobs:
build:
working_directory: ~/bounded-framework
docker:
- image: circleci/openjdk:11-jdk
- image: cimg/openjdk:11.0
environment:
CODECOV_TOKEN: "abba3073-45f3-4b09-8a18-2cb63f72bbe5"
SBT_VERSION: 1.5.2
SBT_OPTS: "-XX:MaxMetaspaceSize=512m"
JVM_OPTS: -Xmx3200m
TERM: dumb
Expand All @@ -15,23 +14,25 @@ jobs:
- run:
name: Get sbt binary
command: |
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key add
sudo apt-get update
sudo apt-get install sbt
sudo apt-get clean && sudo apt-get autoclean
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo apt-key add
sudo apt-get update
sudo apt-get install sbt
sudo apt-get clean && sudo apt-get autoclean
- checkout
- restore_cache:
keys:
- v1-dependencies-{{ checksum "build.sbt" }}
# fallback to using the latest cache if no exact match is found
- v1-dependencies-
- run:
name: Run tests
#command: cat /dev/null | sbt clean coverage test coverageReport
command: cat /dev/null | sbt clean test
#- store_artifacts: # for display in Artifacts: https://circleci.com/docs/2.0/artifacts/
# sbt-scoverage is not yet compatible with 2.12.13
# https://github.com/scoverage/sbt-scoverage/issues/319
# We are running 2.13, so should not be an issue
name: Run tests
command: cat /dev/null | sbt coverage test coverageReport
#- store_artifacts: # for display in Artifacts: https://circleci.com/docs/2.0/artifacts/
#path: target/universal/samplescala.zip
#destination: samplescala
- save_cache:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

package io.cafienne.bounded.test

import akka.Done

import java.util.UUID
import akka.actor._
import akka.persistence.inmemory.extension.{InMemoryJournalStorage, InMemorySnapshotStorage, StorageExtension}
Expand Down Expand Up @@ -64,20 +66,20 @@ class TestableProjection private (system: ActorSystem, timeout: Timeout, tags: S
eventMaterializers.get.startUp(true).map(list => list.head)
}

def addEvents(evt: Seq[DomainEvent]): Unit = {
def addEvents(evt: Seq[DomainEvent]): Future[Done] = {
eventMaterializers.fold(throw new IllegalStateException("You start the projection before you add events"))(_ => {
storeEvents(evt)
})
}

def addEvent(evt: DomainEvent): Unit = {
def addEvent(evt: DomainEvent): Future[Done] = {
eventMaterializers.fold(throw new IllegalStateException("You start the projection before you add events"))(_ => {
storeEvents(Seq(evt))
})
}

// Blocking way to store events.
private def storeEvents(evt: Seq[DomainEvent]): Unit = {
private def storeEvents(evt: Seq[DomainEvent]): Future[Done] = {

// evt.groupBy(evt => evt.id).foreach(grp => persistenceTestKit.persistForRecovery(grp._1, grp._2))

Expand All @@ -101,12 +103,20 @@ class TestableProjection private (system: ActorSystem, timeout: Timeout, tags: S
waitTillLastEventIsProcessed(evt)
}

private def waitTillLastEventIsProcessed(evt: Seq[DomainEvent]) = {
private def waitTillLastEventIsProcessed(evt: Seq[DomainEvent]): Future[Done] = {
if (materializerId.isDefined) {
eventStreamListener.fishForSpecificMessage(timeout.duration, "wait till last event is processed") {
case e: EventProcessed if materializerId.get == e.materializerId && evt.last == e.evt =>
system.log.debug("catch " + e)
var messageProcessedCounter = 0
while (messageProcessedCounter < evt.length) {
eventStreamListener.fishForSpecificMessage(timeout.duration, "wait till last event is processed") {
case e: EventProcessed if materializerId.get == e.materializerId =>
messageProcessedCounter += 1
system.log.debug("catch " + e)
case other => system.log.debug("other message fished: {}", other)
}
}
Future.successful(Done)
} else {
Future.failed(new Exception("No materializer with id:" + materializerId + " found"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import io.cafienne.bounded.aggregate.{DomainCommand, DomainEvent, HandlingFailur

import java.time.OffsetDateTime
import java.util.UUID
import scala.concurrent.duration.Duration

object DomainProtocol {

Expand Down Expand Up @@ -67,8 +68,13 @@ object DomainProtocol {

case class UpdateState(metaData: CommandMetaData, aggregateRootId: String, state: String) extends DomainCommand

case class UpdateStateSlow(metaData: CommandMetaData, aggregateRootId: String, state: String, waitFor: Duration)
extends DomainCommand

case class StateUpdated(metaData: MetaData, id: String, state: String) extends DomainEvent

case class SlowStateUpdated(metaData: MetaData, id: String, state: String, waited: Long) extends DomainEvent

//This can be sent but is not handled so gives a Ko(UnExpectedCommand)
case class CommandWithoutHandler(metaData: CommandMetaData, aggregateRootId: String, msg: String)
extends DomainCommand
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,23 @@ class TestAggregateRoot(aggregateRootId: String) extends AggregateRootActor[Test
} else {
Ko(StateTransitionForbidden(aggregateState.map(_.state), state))
}
case UpdateStateSlow(metaData, aggregateRootId, state, waitFor) =>
Thread.sleep(waitFor.length)
val testMetaData = metaData.asInstanceOf[TestCommandMetaData]
if (aggregateState.isDefined) {
Ok(
Seq(
SlowStateUpdated(
TestMetaData.fromCommand(testMetaData),
aggregateRootId,
state,
waitFor.length
)
)
)
} else {
Ko(StateTransitionForbidden(aggregateState.map(_.state), state))
}
case other => Ko(new UnexpectedCommand(other))
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,22 @@ import java.time.OffsetDateTime

class TestableProjectionSpec extends AsyncWordSpecLike with Matchers with ScalaFutures with BeforeAndAfterAll {

implicit val timeout = Timeout(10.seconds)
implicit val timeout = Timeout(30.seconds)
implicit val system =
ActorSystem("TestableProjectionSpecSystem", SpecConfig.testConfig)

implicit val logger: LoggingAdapter = Logging(system, getClass)

val testProjectionMaterializer = new TestProjectionMaterializer(system) with OffsetStoreProvider
val metaDate = TestMetaData(OffsetDateTime.now(), None, None)
val metaDate = TestMetaData(OffsetDateTime.now(), None, None)

"The testable projection" must {

val testAggregateRootId1 = "arTest1"
val evt1 = InitialStateCreated(metaDate, testAggregateRootId1, "initialState")
val evt2 = StateUpdated(metaDate, testAggregateRootId1, "updatedState")

val fixture = TestableProjection.given(Seq(evt1, evt2), Set("ar-test", "aggregate"))
val testProjectionMaterializer = new TestProjectionMaterializer(system) with OffsetStoreProvider
val testAggregateRootId1 = "arTest1"
val evt1 = InitialStateCreated(metaDate, testAggregateRootId1, "initialState")
val evt2 = StateUpdated(metaDate, testAggregateRootId1, "updatedState")
val fixture = TestableProjection.given(Seq(evt1, evt2), Set("ar-test", "aggregate"))

"Store basic events at the start" in {

whenReady(fixture.startProjection(testProjectionMaterializer)) { replayResult =>
logger.info("replayResult: {}", replayResult)
assert(replayResult.offset == Some(Sequence(2L)))
Expand All @@ -53,13 +50,30 @@ class TestableProjectionSpec extends AsyncWordSpecLike with Matchers with ScalaF

"Store more events" in {
val evt3 = StateUpdated(metaDate, testAggregateRootId1, "morestate")
whenReady(fixture.addEvent(evt3)) { result => testProjectionMaterializer.events.size should be(3) }
}

fixture.addEvent(evt3)
"Store more events slowly" in {
val evt4 = SlowStateUpdated(metaDate, testAggregateRootId1, "morestate", 1000L)
whenReady(fixture.addEvent(evt4)) { result => testProjectionMaterializer.events.size should be(4) }
}

testProjectionMaterializer.events.size should be(3)
"Store even more events slowly" in {
val evt5 = SlowStateUpdated(metaDate, testAggregateRootId1, "morestate2", 1000L)
val evt6 = SlowStateUpdated(metaDate, testAggregateRootId1, "morestate3", 1000L)
whenReady(fixture.addEvents(Seq(evt5, evt6))) { result => testProjectionMaterializer.events.size should be(6) }
}

"Store even more and more events slowly" in {
val evt7 = SlowStateUpdated(metaDate, testAggregateRootId1, "morestate4", 2000L)
val evt8 = SlowStateUpdated(metaDate, testAggregateRootId1, "morestate5", 2000L)
val evt9 = SlowStateUpdated(metaDate, testAggregateRootId1, "morestate6", 2000L)
whenReady(fixture.addEvents(Seq(evt7, evt8, evt9))) { result =>
testProjectionMaterializer.events.size should be(9)
}
}
}

}

class TestProjectionMaterializer(actorSystem: ActorSystem) extends AbstractReplayableEventMaterializer(actorSystem) {
Expand Down Expand Up @@ -91,6 +105,10 @@ class TestProjectionMaterializer(actorSystem: ActorSystem) extends AbstractRepla
case event: StateUpdated =>
events = events :+ event
Future.successful(Done)
case event: SlowStateUpdated =>
Thread.sleep(event.waited)
events = events :+ event
Future.successful(Done)
case _ => Future.successful(Done)
}
} catch {
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.5.2
sbt.version=1.5.5
2 changes: 1 addition & 1 deletion version.sbt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@

version in ThisBuild := "0.3.4"
ThisBuild / version := "0.3.5"

0 comments on commit 80b3223

Please sign in to comment.