Skip to content

Commit

Permalink
Improve Util class with huge refactoring and possibility to rebalance
Browse files Browse the repository at this point in the history
  • Loading branch information
donhardman committed Sep 14, 2023
1 parent b196b38 commit 5fcfc2f
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 25 deletions.
2 changes: 1 addition & 1 deletion src/Sharding/Table.php
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ protected function configureNodeShards(
$nodes = $this->cluster->getNodes();

// First create the sharding scheme for table and system table
$scheme = Util::createShardingScheme($nodes, $shardCount, $replicationFactor);
$scheme = Util::createShardingSchema($nodes, $shardCount, $replicationFactor);
$this->updateScheme($scheme);

// Return prepared scheme before
Expand Down
252 changes: 228 additions & 24 deletions src/Sharding/Util.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,65 +8,269 @@
use RuntimeException;

final class Util {
/**
* Generate sharding scheme by using input nodes, shards and replication
* @param Set<string> $nodes
* @param int $shardCount
* @param int $replicationFactor
* @return Vector<array{node:string,shards:Set<int>,connections:Set<string>}>
*/
public static function createShardingScheme(Set $nodes, int $shardCount, int $replicationFactor = 2): Vector {
/**
* Generate sharding schema by using input nodes, shards and replication
* @param Set<string> $nodes
* @param int $shardCount
* @param int $replicationFactor
* @return Vector<array{node:string,shards:Set<int>,connections:Set<string>}>
*/
public static function createShardingSchema(
Set $nodes,
int $shardCount,
int $replicationFactor = 2
): Vector {
$nodeCount = $nodes->count();
$replicaCount = ($replicationFactor - 1);

if ($replicaCount >= $nodeCount) {
throw new RuntimeException(
"Replica count for factor of {$replicationFactor}"
. " is greater than node count: {$replicaCount} > {$nodeCount}"
"Replica count for factor of {$replicationFactor} is"
." greater than node count: {$replicaCount} > {$nodeCount}"
);
}
/** @var Vector<array{node:string,shards:Set<int>,connections:Set<string>}> */

$schema = self::initializeSchema($nodes);
$nodeMap = self::initializeNodeMap($nodeCount);

return self::assignNodesToSchema($schema, $nodeMap, $nodes, $shardCount, $replicationFactor);
}

/**
* @param Set<string> $nodes
* @return Vector<array{node:string,shards:Set<int>,connections:Set<string>}>
*/
private static function initializeSchema(Set $nodes): Vector {
/** @var Vector<array{node:string,shards:Set<int>,connections:Set<string>}> */
$schema = new Vector();
/** @var Map<int,int> */
$nodeMap = new Map();

for ($i = 0; $i < $nodeCount; $i++) {
foreach ($nodes as $node) {
$schema->push(
[
'node' => $nodes->get($i),
'shards' => new Set(),
'connections' => new Set(),
'node' => $node,
'shards' => new Set(),
'connections' => new Set(),
]
);
$nodeMap->put($i, 0);
}

return $schema;
}

/**
* @param int $count Count of nodes
* @return Map<int,int>
*/
private static function initializeNodeMap(int $count): Map {
$map = new Map();

for ($i = 0; $i < $count; $i++) {
$map->put($i, 0);
}

return $map;
}

/**
* @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $schema
* @param Map<int,int> $nodeMap
* @param Set<string> $nodes
* @param int $shardCount
* @param int $replicationFactor
* @return Vector<array{node:string,shards:Set<int>,connections:Set<string>}>
*/
private static function assignNodesToSchema(
Vector $schema,
Map $nodeMap,
Set $nodes,
int $shardCount,
int $replicationFactor
): Vector {
$assignedNodes = new Set();

for ($i = 0; $i < $shardCount; $i++) {
$usedNodesInCurrentReplication = new Set();

for ($j = 0; $j < $replicationFactor; $j++) {
/** @var int */
$minShards = min($nodeMap->values()->toArray());

$nodesWithMinShards = $nodeMap->filter(
fn($node, $shards) =>
$shards === $minShards
$shards === $minShards
&& !$usedNodesInCurrentReplication->contains($node)
)
->keys();
/** @var int $node */
->keys();

$node = $nodesWithMinShards->toArray()[array_rand($nodesWithMinShards->toArray())];

$schema->get($node)['shards']->add($i);
$assignedNodes->add($node);
$nodeMap->put($node, $nodeMap[$node] + 1);
$usedNodesInCurrentReplication->add($node);
}

// $usedNodes = $usedNodesInCurrentReplication->map(fn($v) => $nodes[$v]);
foreach ($usedNodesInCurrentReplication as $node) {
$schema[$node]['connections'] = $usedNodesInCurrentReplication
->map(fn($i) => $nodes[$i]);
->map(fn($i) => $nodes[$i]);
}
}

return $schema;
}

/**
* Make rebalance of the sharding schema and return new one
* @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $schema
* @param Set<string> $nodes
* @return Vector<array{node:string,shards:Set<int>,connections:Set<string>}>
*/
public static function rebalanceShardingScheme(Vector $schema, Set $nodes): Vector {
$newSchema = self::copyActiveNodeAssignments($schema, $nodes);
$newSchema = self::addNodesToSchema($newSchema, $nodes);
$inactiveShards = self::findInactiveShards($schema, $nodes);

return self::assignShardsToNodes($newSchema, $inactiveShards);
}

/**
* @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $schema
* @param Set<string> $nodes
* @return Vector<array{node:string,shards:Set<int>,connections:Set<string>}>
*/
private static function copyActiveNodeAssignments(Vector $schema, Set $nodes): Vector {
/** @var Vector<array{node:string,shards:Set<int>,connections:Set<string>}> */
$newSchema = new Vector();

foreach ($schema as $node) {
if (!$nodes->contains($node['node'])) {
continue;
}

$newSchema[] = [
'node' => $node['node'],
'shards' => clone $node['shards'],
'connections' => clone $node['connections'],
];
}

return $newSchema;
}

/**
* @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $schema
* @param Set<string> $nodes
* @return Vector<array{node:string,shards:Set<int>,connections:Set<string>}>
*/
private static function addNodesToSchema(Vector $schema, Set $nodes): Vector {
foreach ($nodes as $node) {
$hasNode = $schema
->filter(fn($v) => $v['node'] === $node)
->count();
if ($hasNode) {
continue;
}

$schema[] = [
'node' => $node,
'shards' => new Set(),
'connections' => new Set(),
];
}

return $schema;
}

/**
* @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $schema
* @param Set<string> $nodes
* @return Set<int>
*/
private static function findInactiveShards(Vector $schema, Set $nodes): Set {
$shards = new Set();

foreach ($schema as $node) {
if ($nodes->contains($node['node'])) {
continue;
}

foreach ($node['shards'] as $shard) {
$shards->add($shard);
}
}

return $shards;
}

/**
* @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $schema
* @param Set<int> $shards
* @return Vector<array{node:string,shards:Set<int>,connections:Set<string>}>
*/
private static function assignShardsToNodes(Vector $schema, Set $shards): Vector {
$nodeMap = self::initializeNodeMapForRebalance($schema);

foreach ($shards as $shard) {
$node = self::findNodeWithMinimumShards($nodeMap);
// It will never happen, but for phpstan
if (!isset($schema[$node])) {
throw new RuntimeException("Inconsistency with schema node #{$node}");
}
$schema[$node]['shards']->add($shard);
$nodeMap->put($node, $nodeMap[$node] + 1);

$schema[$node]['connections'] = self::findUsedNodesInCurrentReplication($schema, $shard);
}

return $schema;
}

/**
* @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $schema
* @return Map<int,int>
*/
private static function initializeNodeMapForRebalance(Vector $schema): Map {
/** @var Map<int,int> */
$map = new Map;

foreach ($schema as $i => $node) {
$map[$i] = $node['shards']->count();
}

return $map;
}

/**
* @param Map<int,int> $nodeMap
* @return int
*/
private static function findNodeWithMinimumShards(Map $nodeMap): int {
$minShards = min($nodeMap->values()->toArray());

$nodesWithMinShards = $nodeMap->filter(
fn($node, $shards) =>
$node >= 0 && $shards === $minShards
)
->keys();

return $nodesWithMinShards->toArray()[array_rand($nodesWithMinShards->toArray())];
}

/**
* @param Vector<array{node:string,shards:Set<int>,connections:Set<string>}> $schema
* @param int $shard
* @return Set<string>
*/
private static function findUsedNodesInCurrentReplication(Vector $schema, int $shard): Set {
$set = new Set();

foreach ($schema as $row) {
if (!$row['shards']->contains($shard)) {
continue;
}

$set->add($row['node']);
}

return $set;
}
}

0 comments on commit 5fcfc2f

Please sign in to comment.