Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
roll 2 - scala
Browse files Browse the repository at this point in the history
* convert actors to Typed
* use custom ShardingMessageExtractor in ClusterSharding.init
* use same akka.cluster.sharding.number-of-shards config as was in roll 1
  • Loading branch information
patriknw committed Nov 6, 2019
1 parent 359e08d commit 0ceac95
Show file tree
Hide file tree
Showing 10 changed files with 337 additions and 0 deletions.
10 changes: 10 additions & 0 deletions akka-sample-sharding-typed-scala/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
Akka sample by Lightbend

Licensed under Public Domain (CC0)

To the extent possible under law, the person who associated CC0 with
this Template has waived all copyright and related or neighboring
rights to this Template.

You should have received a copy of the CC0 legalcode along with this
work. If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
39 changes: 39 additions & 0 deletions akka-sample-sharding-typed-scala/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
This tutorial contains a sample illustrating [Akka Cluster Sharding](http://doc.akka.io/docs/akka/current/scala/cluster-sharding.html#an-example).

## Example overview

First of all, make sure the correct settings in [application.conf](src/main/resources/application.conf) are set as described in the akka-sample-cluster tutorial.

Open [ShardingApp.scala](src/main/scala/sample/sharding/ShardingApp.scala).

This small program starts an ActorSystem with Cluster Sharding enabled. It joins the cluster and starts a `Devices` actor. This actor starts the infrastructure to shard `Device` instances and starts sending messages to arbitrary devices.

To run this sample, type `sbt "runMain sample.sharding.ShardingApp"` if it is not already started.

`ShardingApp` starts three actor systems (cluster members) in the same JVM process. It can be more interesting to run them in separate processes. Stop the application and then open three terminal windows.

In the first terminal window, start the first seed node with the following command:

sbt "runMain sample.sharding.ShardingApp 2551"

2551 corresponds to the port of the first seed-nodes element in the configuration. In the log output you see that the cluster node has been started and changed status to 'Up'.

You'll see a log message when `Devices` sends a message to record the current temperature, and for each of those you'll see a log message from the `Device` showing the action taken and the new average temperature.

In the second terminal window, start the second seed node with the following command:

sbt "runMain sample.sharding.ShardingApp 2552"

2552 corresponds to the port of the second seed-nodes element in the configuration. In the log output you see that the cluster node has been started and joins the other seed node and becomes a member of the cluster. Its status changed to 'Up'. Switch over to the first terminal window and see in the log output that the member joined.

Some of the devices that were originally on the `ActorSystem` on port 2551 will be migrated to the newly joined `ActorSystem` on port 2552. The migration is straightforward: the old actor is stopped and a fresh actor is started on the newly created `ActorSystem`. Notice this means the average is reset: if you want your state to be persisted you'll need to take care of this yourself. For this reason Cluster Sharding and Akka Persistence are such a popular combination.

Start another node in the third terminal window with the following command:

sbt "runMain sample.sharding.ShardingApp 0"

Now you don't need to specify the port number, 0 means that it will use a random available port. It joins one of the configured seed nodes. Look at the log output in the different terminal windows.

Start even more nodes in the same way, if you like.

Shut down one of the nodes by pressing 'ctrl-c' in one of the terminal windows. The other nodes will detect the failure after a while, which you can see in the log output in the other terminals.
33 changes: 33 additions & 0 deletions akka-sample-sharding-typed-scala/build.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import com.typesafe.sbt.SbtMultiJvm.multiJvmSettings
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys.MultiJvm

val akkaVersion = "2.6.0-M4"

lazy val `akka-sample-sharding-typed-scala` = project
.in(file("."))
.settings(multiJvmSettings: _*)
.settings(
organization := "com.typesafe.akka.samples",
scalaVersion := "2.12.8",
scalacOptions in Compile ++= Seq(
"-deprecation",
"-feature",
"-unchecked",
"-Xlog-reflective-calls",
"-Xlint"
),
javacOptions in Compile ++= Seq("-Xlint:unchecked", "-Xlint:deprecation"),
javaOptions in run ++= Seq("-Xms128m", "-Xmx1024m"),
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-cluster-sharding-typed" % akkaVersion,
"com.typesafe.akka" %% "akka-serialization-jackson" % akkaVersion,
"org.scalatest" %% "scalatest" % "3.0.7" % Test
),
mainClass in (Compile, run) := Some("sample.sharding.ShardingApp"),
// disable parallel tests
parallelExecution in Test := false,
licenses := Seq(
("CC0", url("http://creativecommons.org/publicdomain/zero/1.0"))
)
)
.configs(MultiJvm)
1 change: 1 addition & 0 deletions akka-sample-sharding-typed-scala/project/build.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
sbt.version=1.2.8
2 changes: 2 additions & 0 deletions akka-sample-sharding-typed-scala/project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
addSbtPlugin("com.dwijnand" % "sbt-dynver" % "3.0.0")
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
akka {
loglevel = INFO

actor {
provider = "cluster"

serialization-bindings {
"sample.sharding.Message" = jackson-cbor
}
}

# For the sample, just bind to loopback and do not allow access from the network
# the port is overridden by the logic in main class
remote.artery {
canonical.port = 0
canonical.hostname = 127.0.0.1
}

cluster {
seed-nodes = [
"akka://[email protected]:2551",
"akka://[email protected]:2552"]

# auto downing is NOT safe for production deployments.
# you may want to use it during development, read more about it in the docs.
auto-down-unreachable-after = 10s

sharding.number-of-shards = 100
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package sample.sharding

import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.HashCodeMessageExtractor
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.ShardingMessageExtractor
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey

/**
* This is just an example: cluster sharding would be overkill for just keeping a small amount of data,
* but becomes useful when you have a collection of 'heavy' actors (in terms of processing or state)
* so you need to distribute them across several nodes.
*/
object Device {
val TypeKey = EntityTypeKey[Device.Command]("Device")

def init(system: ActorSystem[_]): Unit = {

val messageExtractor =
new ShardingMessageExtractor[Any, Command] {

// Note that `HashCodeMessageExtractor` is using
// `(math.abs(id.hashCode) % numberOfShards).toString`.
// If the old Untyped nodes were using a different hashing function
// this delegate HashCodeMessageExtractor can't be used and
// same hashing function as before must be implemented here.
// `akka.cluster.sharding.typed.HashCodeMessageExtractor` is compatible
// with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`.
val delegate = new HashCodeMessageExtractor[Device.Command](
system.settings.config
.getInt("akka.cluster.sharding.number-of-shards")
)

override def entityId(message: Any): String = {
message match {
case Device.RecordTemperature(deviceId, _) =>
deviceId.toString
case Device.GetTemperature(deviceId, _) =>
deviceId.toString
case env: ShardingEnvelope[Device.Command] =>
delegate.entityId(env)
}
}

override def shardId(entityId: String): String = {
delegate.shardId(entityId)
}

override def unwrapMessage(message: Any): Command = {
message match {
case m: Device.RecordTemperature => m
case m: Device.GetTemperature => m
case env: ShardingEnvelope[Device.RecordTemperature] =>
delegate.unwrapMessage(env)
}
}
}

ClusterSharding(system).init(
Entity(TypeKey, _ => Device())
.withMessageExtractor(messageExtractor)
)
}

sealed trait Command extends Message

case class RecordTemperature(deviceId: Int, temperature: Double)
extends Command

case class GetTemperature(deviceId: Int, replyTo: ActorRef[Temperature])
extends Command

case class Temperature(deviceId: Int,
average: Double,
latest: Double,
readings: Int)
extends Message

def apply(): Behavior[Command] =
counting(Vector.empty)

private def counting(values: Vector[Double]): Behavior[Command] = {
Behaviors.receive { (context, cmd) =>
cmd match {
case RecordTemperature(id, temp) =>
val temperatures = values :+ temp
context.log.info(
s"Recording temperature $temp for device $id, average is ${average(temperatures)} after " +
s"${temperatures.size} readings"
)
counting(temperatures)

case GetTemperature(id, replyTo) =>
val reply =
if (values.isEmpty)
Temperature(id, Double.NaN, Double.NaN, 0)
else
Temperature(id, average(values), values.last, values.size)
replyTo ! reply
Behaviors.same
}
}
}

private def average(values: Vector[Double]): Double =
if (values.isEmpty) Double.NaN
else values.sum / values.size
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package sample.sharding

import scala.concurrent.duration._
import scala.util.Random

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.scaladsl.ClusterSharding

object Devices {
sealed trait Command

private case object UpdateDevice extends Command

private case object ReadTemperatures extends Command

private case class GetTemperatureReply(temp: Device.Temperature)
extends Command

def apply(): Behavior[Command] = {
Behaviors.setup { context =>
Device.init(context.system)
val sharding = ClusterSharding(context.system)

Behaviors.withTimers { timers =>
val random = new Random()
val numberOfDevices = 50

timers.startTimerWithFixedDelay(UpdateDevice, UpdateDevice, 1.second)
timers.startTimerWithFixedDelay(
ReadTemperatures,
ReadTemperatures,
15.seconds
)

val temperatureAdapter =
context.messageAdapter[Device.Temperature](GetTemperatureReply(_))

Behaviors.receiveMessage {
case UpdateDevice =>
val deviceId = random.nextInt(numberOfDevices)
val temperature = 5 + 30 * random.nextDouble()
val msg = Device.RecordTemperature(deviceId, temperature)
context.log.info(s"Sending $msg")
sharding.entityRefFor(Device.TypeKey, deviceId.toString) ! msg
Behaviors.same

case ReadTemperatures =>
(0 to numberOfDevices).foreach { deviceId =>
val entityRef =
sharding.entityRefFor(Device.TypeKey, deviceId.toString)
entityRef ! Device.GetTemperature(deviceId, temperatureAdapter)
}
Behaviors.same

case GetTemperatureReply(temp: Device.Temperature) =>
if (temp.readings > 0)
context.log.info(
"Temperature of device {} is {} with average {} after {} readings",
temp.deviceId,
temp.latest,
temp.average,
temp.readings
)
Behaviors.same
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/**
* Copyright (C) 2019 Lightbend Inc. <https://www.lightbend.com>
*/
package sample.sharding

/**
* Marker interface for actor messages that are serialized.
*/
trait Message
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package sample.sharding

import akka.actor.typed.ActorSystem
import com.typesafe.config.ConfigFactory

object ShardingApp {
def main(args: Array[String]): Unit = {
if (args.isEmpty)
startup(Seq("2551", "2552", "0"))
else
startup(args)
}

def startup(ports: Seq[String]): Unit = {
// In a production application you wouldn't typically start multiple ActorSystem instances in the
// same JVM, here we do it to easily demonstrate these ActorSytems (which would be in separate JVM's)
// talking to each other.
ports foreach { port =>
// Override the configuration of the port
val config = ConfigFactory
.parseString("akka.remote.artery.canonical.port=" + port)
.withFallback(ConfigFactory.load())

// Create an Akka system, with Devices actor that starts the sharding and sends random messages
ActorSystem(Devices(), "ShardingSystem", config)
}
}

}

0 comments on commit 0ceac95

Please sign in to comment.