Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
roll 3
Browse files Browse the repository at this point in the history
* custom ShardingMessageExtractor can be removed
  • Loading branch information
patriknw committed Jun 19, 2019
1 parent 7618fa7 commit 483f612
Showing 1 changed file with 5 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
import akka.cluster.sharding.typed.HashCodeMessageExtractor
import akka.cluster.sharding.typed.ShardingEnvelope
import akka.cluster.sharding.typed.ShardingMessageExtractor
import akka.cluster.sharding.typed.scaladsl.ClusterSharding
import akka.cluster.sharding.typed.scaladsl.Entity
import akka.cluster.sharding.typed.scaladsl.EntityTypeKey
Expand All @@ -21,50 +18,11 @@ object Device {

def init(system: ActorSystem[_]): Unit = {

val messageExtractor =
new ShardingMessageExtractor[Any, Command] {

// Note that `HashCodeMessageExtractor` is using
// `(math.abs(id.hashCode) % numberOfShards).toString`.
// If the old Untyped nodes were using a different hashing function
// this delegate HashCodeMessageExtractor can't be used and
// same hashing function as before must be implemented here.
// `akka.cluster.sharding.typed.HashCodeMessageExtractor` is compatible
// with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`.
val delegate = new HashCodeMessageExtractor[Device.Command](
system.settings.config
.getInt("akka.cluster.sharding.number-of-shards")
)

override def entityId(message: Any): String = {
message match {
case Device.RecordTemperature(deviceId, _) =>
deviceId.toString
case Device.GetTemperature(deviceId, _) =>
deviceId.toString
case env: ShardingEnvelope[Device.Command] =>
delegate.entityId(env)
}
}

override def shardId(entityId: String): String = {
delegate.shardId(entityId)
}

override def unwrapMessage(message: Any): Command = {
message match {
case m: Device.RecordTemperature => m
case m: Device.GetTemperature => m
case env: ShardingEnvelope[Device.RecordTemperature] =>
delegate.unwrapMessage(env)
}
}
}

ClusterSharding(system).init(
Entity(TypeKey, _ => Device())
.withMessageExtractor(messageExtractor)
)
// If the original hashing function was using
// `(math.abs(id.hashCode) % numberOfShards).toString`
// the default HashCodeMessageExtractor in Typed can be used.
// That is also compatible with `akka.cluster.sharding.ShardRegion.HashCodeMessageExtractor`.
ClusterSharding(system).init(Entity(TypeKey, _ => Device()))
}

sealed trait Command extends Message
Expand Down

0 comments on commit 483f612

Please sign in to comment.