Skip to content

Commit

Permalink
feed: reuse forecasts generators (#206)
Browse files Browse the repository at this point in the history
  • Loading branch information
gvolpe authored Dec 1, 2022
1 parent ded8e63 commit af1ec00
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 17 deletions.
19 changes: 19 additions & 0 deletions modules/domain/jvm/src/test/scala/trading/domain/generators.scala
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,25 @@ object generators:
t <- timestampGen
yield ForecastCommand.Vote(i, c, f, v, t)

def registerCommandGen_(
_id: CommandId
): Gen[ForecastCommand.Register] =
registerCommandGen.map(_.copy(id = _id))

def publishCommandGen_(
_id: CommandId,
_cid: CorrelationId,
_aid: AuthorId
): Gen[ForecastCommand.Publish] =
publishCommandGen.map(_.copy(id = _id, cid = _cid, authorId = _aid))

def voteCommandGen_(
_id: CommandId,
_cid: CorrelationId,
_fid: ForecastId
): Gen[ForecastCommand.Vote] =
voteCommandGen.map(_.copy(id = _id, cid = _cid, forecastId = _fid))

val forecastCommandGen: Gen[ForecastCommand] =
Gen.oneOf(publishCommandGen, registerCommandGen, voteCommandGen)

Expand Down
25 changes: 8 additions & 17 deletions modules/feed/src/main/scala/trading/feed/ForecastFeed.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,9 @@ object ForecastFeed:
.withShardKey(Shard[A].key)
.some

val ts = Timestamp(Instant.parse("2022-10-03T14:00:00.00Z"))

// The randomness of randomUUID() seems better than that of Gen.uuid
def makeCmdId = CommandId(UUID.randomUUID())

def cmd2(aid: AuthorId, cid: CorrelationId) =
ForecastCommand.Publish(makeCmdId, cid, aid, Symbol.EURUSD, ForecastDescription("foo"), ForecastTag.Short, ts)

def cmd3(fid: ForecastId, cid: CorrelationId) =
ForecastCommand.Vote(makeCmdId, cid, fid, VoteResult.Up, ts)

// Simulates a flow of realistic commands and events with matching IDs:
// 1. Send random Register command every 2 seconds.
// 2. On every Registered event received, send a Publish command.
Expand All @@ -53,25 +46,23 @@ object ForecastFeed:
ac: Consumer[IO, AuthorEvent]
): Stream[IO, Unit] =
val atEvents = ac.receive.evalMap { case AuthorEvent.Registered(_, cid, aid, _, _, _) =>
val cmd = cmd2(aid, cid)
IO.println(s">>> $cmd ") *> fp.send(cmd)
publishCommandGen_(makeCmdId, cid, aid).sample.traverse_ { cmd =>
IO.println(s">>> $cmd ") *> fp.send(cmd)
}
}

val fcEvents = fc.receive.evalMap {
case ForecastEvent.Published(_, cid, _, fid, _, _) =>
val cmd = cmd3(fid, cid)
IO.println(s">>> $cmd ") *> fp.send(cmd)
voteCommandGen_(makeCmdId, cid, fid).sample.traverse_ { cmd =>
IO.println(s">>> $cmd ") *> fp.send(cmd)
}
case _ => IO.unit
}

val uniqueCmds =
Stream
.repeatEval {
registerCommandGen.sample.traverse_ { cmd =>
import ForecastCommand.*
val unique = _CommandId.replace(makeCmdId)(cmd)
fp.send(unique)
}
registerCommandGen_(makeCmdId).sample.traverse_(fp.send)
}
.metered(2.seconds)
.interruptAfter(6.seconds)
Expand Down

0 comments on commit af1ec00

Please sign in to comment.