Skip to content

Commit

Permalink
Change the logic to use cluster name that is made of connected nodes …
Browse files Browse the repository at this point in the history
…instead of also shards
  • Loading branch information
donhardman committed Sep 18, 2023
1 parent 34477a8 commit f859a28
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 94 deletions.
12 changes: 12 additions & 0 deletions src/Sharding/Cluster.php
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ public function create(?Queue $queue = null): int {
return $this->runQuery($queue, $query);
}

/**
* When we have rf=2 and/or cluster with 2 nodes
* while one is dead we need to remove it
* to make it we need to make it safe first
* @param ?Queue $queue
* @return int
*/
public function makePrimary(?Queue $queue = null): int {
$query = "SET CLUSTER {$this->name} GLOBAL 'pc.bootstrap' = 1";
return $this->runQuery($queue, $query);
}

/**
* Remove the cluster, we should run it on one
* Another will catch up
Expand Down
204 changes: 110 additions & 94 deletions src/Sharding/Table.php
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,18 @@ public function shard(

$schema = $this->configureNodeShards($shardCount, $replicationFactor);
$reduceFn = function (Map $clusterMap, array $row) use ($queue, $replicationFactor, &$nodes, &$nodeShardsMap) {
/** @var Map<string,Set<string>> $clusterMap */
$nodes->add($row['node']);
$nodeShardsMap[$row['node']] = $row['shards'];

foreach ($row['shards'] as $shard) {
$connectedNodes = $this->getConnectedNodes(new Set([$shard]));
$nodesToJoin = $connectedNodes->filter(fn($node) => $node !== $row['node']);

if ($replicationFactor > 1) {
$clusterMap = $this->handleReplication(
$row['node'],
$queue,
$connectedNodes,
$nodesToJoin,
$clusterMap,
$shard
);
Expand Down Expand Up @@ -234,37 +233,40 @@ public function shard(
* @param string $node
* @param Queue $queue
* @param Set<string> $connectedNodes
* @param Set<string> $nodesToJoin
* @param Map<string,string> $clusterMap
* @param Map<string,Set<string>> $clusterMap
* @param int $shard
* @return Map<string,string> The cluster map that we use
* @return Map<string,Set<string>> The cluster map that we use
* for session to maintain which nodes are connected
* and which cluster are processed already and who is the owner
*/
protected function handleReplication(
string $node,
Queue $queue,
Set $connectedNodes,
Set $nodesToJoin,
Map $clusterMap,
int $shard
): Map {
$clusterName = $this->getClusterName($connectedNodes, $shard);
if (!isset($clusterMap[$clusterName])) {
$cluster = new Cluster($this->client, $clusterName, $node);
$clusterMap[$clusterName] = $node;
$clusterName = $this->getClusterName($connectedNodes);
$hasCluster = isset($clusterMap[$clusterName]);
$cluster = new Cluster($this->client, $clusterName, $node);
if (!$hasCluster) {
$nodesToJoin = $connectedNodes->filter(
fn ($connectedNode) => $connectedNode !== $node
);
$clusterMap[$clusterName] = new Set;
$waitForId = $cluster->create($queue);
$queue->setWaitForId($waitForId);
$cluster->addNodeIds($queue, ...$nodesToJoin);
$queue->resetWaitForId();
}

$table = $this->getTableShardName($shard);
if (!$clusterMap->get($clusterName)->contains($table)) {
$clusterMap->get($clusterName)->add($table);
$sql = $this->getCreateTableShardSQL($shard);
$queue->add($node, $sql);
$table = $this->getTableShardName($shard);
$cluster->addTables($queue, $table);
$queue->resetWaitForId();
}

return $clusterMap;
}

Expand All @@ -278,95 +280,105 @@ protected function handleReplication(
*/
// @phpcs:ignore SlevomatCodingStandard.Complexity.Cognitive.ComplexityTooHigh
public function rebalance(Queue $queue): void {
/** @var Map<string,string> */
$clusterMap = new Map;

$schema = $this->getShardSchema();
$allNodes = $this->cluster->getNodes();
$inactiveNodes = $this->cluster->getInactiveNodes();
if (!$inactiveNodes->count()) {
return;
}
$activeNodes = $allNodes->diff($inactiveNodes);
$newSchema = Util::rebalanceShardingScheme($schema, $activeNodes);
try {
/** @var Map<string,Set<string>> */
$clusterMap = new Map;

$schema = $this->getShardSchema();
$allNodes = $this->cluster->getNodes();
$inactiveNodes = $this->cluster->getInactiveNodes();
if (!$inactiveNodes->count()) {
return;
}
$activeNodes = $allNodes->diff($inactiveNodes);
$newSchema = Util::rebalanceShardingScheme($schema, $activeNodes);

// Detect shard to nodes map with alive schema
$shardNodesMap = $this->getShardNodesMap(
$schema->filter(
fn ($row) => !$inactiveNodes->contains($row['node'])
)
);
// Detect shard to nodes map with alive schema
$shardNodesMap = $this->getShardNodesMap(
$schema->filter(
fn ($row) => !$inactiveNodes->contains($row['node'])
)
);

$affectedSchema = $schema->filter(
fn ($row) => $inactiveNodes->contains($row['node'])
);
foreach ($affectedSchema as $row) {
// First thing first, remove from inactive node using the queue
$this->cleanUpNode($queue, $row['node'], $row['shards']);
// Preload current cluster map with configuration
foreach ($shardNodesMap as $shard => $connections) {
$clusterName = $this->getClusterName($connections);
$clusterMap[$clusterName] ??= new Set;
$clusterMap->get($clusterName)->add(...$connections);
}

// Do real rebaliance now
foreach ($row['shards'] as $shard) {
/** @var Set<string> */
$nodesForShard = new Set;
$affectedSchema = $schema->filter(
fn ($row) => $inactiveNodes->contains($row['node'])
);
foreach ($affectedSchema as $row) {
// First thing first, remove from inactive node using the queue
$this->cleanUpNode($queue, $row['node'], $row['shards']);

foreach ($newSchema as $newRow) {
// The case when this shards should not be here
if (!$newRow['shards']->contains($shard)) {
continue;
// Do real rebaliance now
foreach ($row['shards'] as $shard) {
/** @var Set<string> */
$nodesForShard = new Set;

foreach ($newSchema as $newRow) {
// The case when this shards should not be here
if (!$newRow['shards']->contains($shard)) {
continue;
}
$nodesForShard->add($newRow['node']);
}
$nodesForShard->add($newRow['node']);
}

// This is exception, actually
if (!$nodesForShard->count()) {
continue;
}
// This is exception, actually
if (!$nodesForShard->count()) {
continue;
}

// It's very important here to start replication
// From the live node first due to
// cluster should be created there first
// We use previously generated shard to nodes map
/** @var Set<string> */
$shardNodes = $shardNodesMap[$shard];
// It's very important here to start replication
// From the live node first due to
// cluster should be created there first
// We use previously generated shard to nodes map
/** @var Set<string> */
$shardNodes = $shardNodesMap[$shard];

// If this happens, we have no alive shard, this is critical, but do nothing for now
if (!$shardNodes->count()) {
continue;
// If this happens, we have no alive shard, this is critical, but do nothing for now
if (!$shardNodes->count()) {
continue;
}
$shardNodes->sort();
$aliveNode = $shardNodes->first();
/** @var Set<string> */
$connectedNodes = $nodesForShard->merge($shardNodes);
// $connectedNodes->sort(
// fn ($a, $b) => $a === $aliveNode ? -1 : 1,
// );

// Reconfigure on alive shard
// Cuz it's the main shard where we have data already
$clusterMap = $this->handleReplication(
$aliveNode,
$queue,
$connectedNodes,
$clusterMap,
$shard
);
}
$shardNodes->sort();
$aliveNode = $shardNodes->first();
/** @var Set<string> */
$connectedNodes = $nodesForShard->merge($shardNodes);
// $connectedNodes->sort(
// fn ($a, $b) => $a === $aliveNode ? -1 : 1,
// );

// Reconfigure on alive shard
// Cuz it's the main shard where we have data already
$clusterMap = $this->handleReplication(
$aliveNode,
$queue,
$connectedNodes,
$nodesForShard,
$clusterMap,
$shard
);
}
}

/** @var Set<int> */
$queueIds = new Set;
foreach ($newSchema as $row) {
$sql = "DROP TABLE {$this->name}";
$queueId = $queue->add($row['node'], $sql);
$queueIds->add($queueId);
$queueIds = new Set;
foreach ($newSchema as $row) {
$sql = "DROP TABLE {$this->name}";
$queueId = $queue->add($row['node'], $sql);
$queueIds->add($queueId);

$sql = $this->getCreateShardedTableSQL($row['shards']);
$queueId = $queue->add($row['node'], $sql);
$queueIds->add($queueId);
}

$sql = $this->getCreateShardedTableSQL($row['shards']);
$queueId = $queue->add($row['node'], $sql);
$queueIds->add($queueId);
$this->updateScheme($newSchema);
} catch (\Throwable $t) {
var_dumP($t->getMessage());
}

$this->updateScheme($newSchema);
}

/**
Expand All @@ -379,12 +391,14 @@ public function rebalance(Queue $queue): void {
public function cleanUpNode(Queue $queue, string $nodeId, Set $shards): static {
// Delete distributed table
var_dump("Clean up node $nodeId");
/** @var Set<string> $removedClusters list of clusters that we will delete */
$removedClusters = new Set;
$queue->add($nodeId, "DROP TABLE {$this->name}");
foreach ($shards as $shard) {
var_dump('REMOVE SHARD', $shard);
// First remove cluster, due to we need to detach tables first
$connections = $this->getConnectedNodes(new Set([$shard]));
$clusterName = $this->getClusterName($connections, $shard);
$clusterName = $this->getClusterName($connections);
$table = $this->getTableShardName($shard);
var_dump('connections', json_encode($connections));
// Now detach table from all connections
Expand All @@ -398,11 +412,12 @@ public function cleanUpNode(Queue $queue, string $nodeId, Set $shards): static {
$connectedNode
);
var_dump('removing table', $table);
$cluster->makePrimary($queue);
$cluster->removeTables($queue, $table);
}

// We run it on active node, not down one
if (isset($cluster)) {
if (isset($cluster) && !$removedClusters->contains($cluster->name)) {
var_dump('cluter isset!');
var_dumP(json_encode($cluster));
// We need to fire delete cluster once
Expand All @@ -413,6 +428,8 @@ public function cleanUpNode(Queue $queue, string $nodeId, Set $shards): static {
->setWaitForId($queueId)
->add($nodeId, "DROP TABLE {$table}");
$queue->resetWaitForId();

$removedClusters->add($cluster->name);
}
unset($cluster);
}
Expand Down Expand Up @@ -469,13 +486,12 @@ protected function getAvailableNodes(
}

/**
* Get the unique key for the cluster based on the connections and the shard
* Get the unique key for the cluster based on the connections
* @param Set<string> $connections
* @param int $shard
* @return string
*/
protected function getClusterName(Set $connections, int $shard): string {
$hash = md5("{$this->name}:{$shard}:{$connections->sorted()->join(',')}");
protected function getClusterName(Set $connections): string {
$hash = md5($connections->sorted()->join(','));
if (is_numeric($hash[0])) {
$hash[0] = chr(97 + ($hash[0] % 6));
}
Expand Down

0 comments on commit f859a28

Please sign in to comment.