https://groups.google.com/forum/#!topic/akka-user/MO-4XhwhAN0
https://ringpop.readthedocs.io/en/latest/architecture_design.html
The state of our system usually consists of multiple addressable entities, which are replicated for higher availability and resiliency. However, usually the entire state is too big to fit into any single node. For this reason, it's often partitioned all over the cluster dynamically. How, to tell which node contains the entity identified by a certain key ?
-
The most naive approach would be to ask a subset of nodes, hoping that at least one of them has the data we are looking for. Given we have a cluster of N nodes and entities replicated R times, we should be able to reach our resource after asking (N/R)+1 nodes.
-
Another way is to keep a registry about current localization of every single entity in a system in one single place. Since this approach doesn't scale well in theory, in practice we group and co-locate entities together within partitions and therefore compress the registry to store information about the entire partition rather than individual entity. In this case, a shard ID is a composite key of a (shardID, entityID) tuple. This is how e.g.
akka-cluster-sharding
orriak-core
works. Frequently, some subset of hot (frequently used) partitions may be cached on each node to reduce a number of requests to the central registry, or even the registry itself may be a replicated store. -
We could also use distributed hash tables - where our entity key is hashed and then mapped into a specific node that is responsible for holding resources belonging to that specific subset of key space. Sometimes, this may mean that we miss a node on the first try because the cluster state is changing, and more hops need to be applied. Although
Apache Cassandra
andScyllaDB
are known for using this approach (at least at the moment the text was being written), the approach is a source of many errors (https://www.slideshare.net/ScyllaDB/scylla-summit-2022-making-schema-changes-safe-with-raft-251141793).
In this project, although the RingMaster
holds a distributed hash ring of hashed keys, it's also deployed as a cluster singleton, so it's a combination of points 2 and 3 from above.
akka-cluster-sharding
enables running at most one instance of a particular actor in the cluster, acting as a consistency boundary at any point in time. That's exactly what we want. Each process knows its name from the start (i.e. alpha
, betta
or gamma
). It uses the given shard name as a cluster role and starts sharding on the nodes that belong to that role. Moreover, it allocates only one instance of DeviceDigitalTwin
per node, as we have one-to-one mapping between the shard and the entity.
For example, on each node that belongs to the alpha
role, we allocate only one sharded entity. A combination of a host ip and port is used for the shard(entity) name, therefore it guarantees that each node runs only one instance of the sharded entity.
Each node starts an Http API, therefore the next question to answer being how we decide where the incoming requests should be routed. For that
purpose, we have the RingMaster
actor. As the processes join the cluster, they take ownership for(of) token ranges on a hash ring based on the shard name. RingMaster
, deployed as a cluster singleton, holds the hash and redirects all incoming requests to a particular shard region.
Therefore, if we have the following set of shard names alpha
, betta
, gamma
, then we also have 3 independently running shard regions, each of which knows nothing about each other. Each shard region is used only inside a particular shard and each sharded entity becomes a replica of the shard.
In other words, each shard becomes its own distributed system as each sharded entity inside the shard runs its own independent replicator.
Keywords: scale, consistency and failover.
- Balance resources (memory, disk space, network traffic) across multiple nodes for scalability.
- Distribute entities and data across many nodes in the cluster
- Location transparency: Interact by a logical identifier versus physical location which can change over time.
- Automatic relocation on failure or rebalancing.
Shards are distributed in shard regions. Persistent actor entity - stateful, long-lived, addressable entity. Limits the scope of contention. Contention is isolated to a single entity via a unique identifier.
Akka Cluster Sharding sits on top of Akka Cluster and distributes data in shards, and load across members of a cluster without developers needing to keep track of where data actually resides in the cluster. Data is stored in Actors that represent individual entities, identified by a unique key, which closely corresponds to an Aggregate Root in Domain-Driven Design terminology.
https://martinfowler.com/articles/patterns-of-distributed-systems/fixed-partitions.html
When we add a new shard, say betta, in an operational cluster of 2 alpha nodes ([alpha -> 127.0.0.1-2551,127.0.0.2-2551]), we need to transfer data, that from now on is associated with betta [alpha -> 127.0.0.1-2551,127.0.0.2-2551, betta -> 127.0.0.10-2551]
The docker-compose2.yml by default runs:
- 2 replicas for alpha
- 1 replica for betta
- 1 replica for gamma
Later you can scale up and down the number of replicas for these shards. All replicas that have the same shard name should be responsible to the same range of keys and should have independent replicators on it.
-
First of all, we need to export these env vars
export SEED_DNS=master
export HTTP_PORT=9000
export AKKA_PORT=2551
export HOST=192.168.178.10
orexport HOST=192.168.178.11
export TZ=UTC
export DM=config
-
Build and publish the image
sbt -DSHARD=docker docker
-
Start one seed node and one worker node
docker-compose -f docker-compose2.yml up -d
-
Scale up the number of workers
docker-compose -f docker-compose2.yml scale alpha=3
-
Scale down the number of workers
docker-compose -f docker-compose2.yml scale alpha=2
-
Stop all processes
docker-compose -f docker-compose2.yml stop
-
Delete all processes
docker-compose -f docker-compose2.yml rm
-
Now you can build the image again
sudo ifconfig lo0 127.0.0.2 add
sudo ifconfig lo0 127.0.0.3 add
sbt -DSHARD=a runA0
sbt -DSHARD=a runA1
sbt -DSHARD=a runA2
sbt -DSHARD=b runB0
sbt -DSHARD=b runB1
sbt -DSHARD=b runB2
sbt -DSHARD=g runG0
sbt -DSHARD=g runG1
sbt -DSHARD=g runG2
http GET 127.168.178.10:9000/view
http GET 192.168.77.106:9000/view1
http://192.168.77.106:9000/chord
http GET :9000/cluster/members
curl --no-buffer :9000/metrics
http GET :9000/device/1
http 127.0.0.1:9000/shards
For docker to show all ips docker inspect -f '{{.Name}} - {{.NetworkSettings.IPAddress }}' $(docker ps -aq)
For docker-compose to show all ips docker inspect -f '{{.Name}} - {{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}' $(docker ps -aq)
https://docs.docker.com/compose/compose-file/#ipv4-address-ipv6-address https://www.digitalocean.com/community/tutorials/how-to-provision-and-manage-remote-docker-hosts-with-docker-machine-on-ubuntu-16-04
docker-compose rm seed docker-compose rm worker
docker network ls docker network rm bfb14b518775 a671ca262355
docker pause asdgfasd
https://speedcom.github.io/dsp2017/2017/04/13/justindb-replication-and-partitioning.html
https://doc.akka.io/docs/akka/2.5.25/typed/cluster-sharding.html
https://doc.akka.io/docs/akka/current/distributed-data.html
https://groups.google.com/forum/#!topic/akka-user/MO-4XhwhAN0
https://doc.akka.io/docs/akka-management/current/cluster-http-management.html
ExternalShardAllocation https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html?_ga=2.193469741.1478281344.1585435561-801666185.1515340543#external-shard-allocation
https://blog.knoldus.com/introduction-to-akka-cluster-sharding/
https://medium.com/bestmile/orchestrating-startup-and-shutdown-in-scala-f7ad2644835a
a) Down but not terminate nodes on both sides of your partition.
- Find target
lsof -i :2551 | grep LISTEN | awk '{print $2}'
- Suspend the process
kill -STOP <pid>
- Send to Down via akka-http-managment interface
curl -w '\n' -X PUT -H 'Content-Type: multipart/form-data' -F operation=down http://localhost:2651/cluster/members/[email protected]:2552
- Resume the process
kill -CONT <pid>
b) docker pause c) Bring down all seed nodes leaving only non-seed nodes and then start seed nodes again. They will form a new cluster. d) Incomplete coordinated shutdown. e) Unresponsive applications due to long GC pause.
or
https://github.com/hseeberger/akkluster
To create network partitions we need to connect to a running container and block traffic:
docker run -d --cap-add NET_ADMIN ...
docker exec -i -t ... bash
iptables -A INPUT -p tcp -j DROP
iptables -D INPUT -p tcp -j DROP
https://docs.docker.com/compose/compose-file/#resources
https://github.com/chbatey/docker-jvm-akka/blob/master/docker-compose.yml
http://www.batey.info/docker-jvm-k8s.html
https://merikan.com/2019/04/jvm-in-a-container/ https://www.lightbend.com/blog/cpu-considerations-for-java-applications-running-in-docker-and-kubernetes https://dzone.com/articles/docker-container-resource-management-cpu-ram-and-i
Goals: lossless deployment, back-pressure throughout the whole req/res pipeline.
Let's say we have 2 nodes Alice
and Bob
. Both host actors and accept incoming requests via rest api. When we shut down either of nodes we could lose request.
In particular, if we shut down Bob
while actors on Bob
haven't yet replied to actors on Alice
, the requests that have started on Alice
won't be
completed. The solution being is that the leaving nodes should drain both incoming and outgoing channels.
Draining of incoming(local) requests channel (what CoordinatedShutdown gives you) is not enough.
Streamee
models long-running processes and those processes are suitably hooked into CS to no longer accept new reqs and delay the shutdown until all accepted
req have been processed (across the whole req/resp pipeline).
Why akka-cluster-sharding is not enough? The good part is that it guarantees that if a command riches a shard region and the target sharded entity gets rebalanced or crushed, the command will be buffered and re-routed to the entity once it's available again somewhere else. The problem being that by the time the entity is available again, the caller may already get ask timeout, so we lose the response.
It buffers during rebalancing which takes place when a node fails or
https://manuel.bernhardt.io/2018/02/26/tour-akka-cluster-cluster-sharding/ https://www.youtube.com/watch?v=SrPubnOKJcQ https://doc.akka.io/docs/akka/current/typed/cluster-sharding.html?_ga=2.193469741.1478281344.1585435561-801666185.1515340543#external-shard-allocation
Examples:
https://www.youtube.com/watch?v=imNYRPO74R8
https://github.com/tristanpenman/chordial.git
https://github.com/denis631/chord-dht.git
sbt '; set javaOptions += "-Dconfig.resource=cluster-application.conf" ; run’
sbt -J-Xms512M -J-XX:+PrintCommandLineFlags -J-XshowSettings
curl -w '\n' -X PUT -H 'Content-Type: multipart/form-data' -F operation=leave http://127.0.0.1:9000/cluster/members/akka://[email protected]:2551
- Instead of storing all actorRef in HashRingState I could store only one Leaseholder per a shard and interact with it.
- ExternalShardAllocation to control shard allocation
- Chord, a protocol and algorithm for a peer-to-peer distributed hash table.
Upon arrival of a proxied request at its destination, membership checksums of the sender and receiver will be compared.
The request will be refused if checksums differ. Mismatches are expected when nodes are entering or exiting the cluster due to deploys, added/removed capacity, or failures.
The cluster will eventually converge on one checksum, therefore refused requests are best handled by retrying them.
In RingMaster
we include cluster membership or HashRing checksums, on destination we compare the checksums from the incoming message to the local checksums.