Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

refactor: avoid routing tables returning errors due to incorrect metadata #225

Merged
merged 2 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 21 additions & 19 deletions server/cluster/metadata/cluster_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@
}

func (c *ClusterMetadata) GetClusterID() storage.ClusterID {
c.lock.RLock()
defer c.lock.RUnlock()

Check warning on line 70 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L68-L70

Added lines #L68 - L70 were not covered by tests
return c.clusterID
}

Expand Down Expand Up @@ -95,7 +95,7 @@
for _, table := range tables {
schema, ok := schemaByID[table.SchemaID]
if !ok {
c.logger.Warn("schema not exits", zap.Uint64("schemaID", uint64(table.SchemaID)))

Check warning on line 98 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L98

Added line #L98 was not covered by tests
}
tableInfos = append(tableInfos, TableInfo{
ID: table.ID,
Expand Down Expand Up @@ -176,12 +176,12 @@
for _, tableName := range request.TableNames {
table, exists, err := c.tableManager.GetTable(request.SchemaName, tableName)
if err != nil {
c.logger.Error("get table", zap.Error(err), zap.String("schemaName", request.SchemaName), zap.String("tableName", tableName))

Check warning on line 179 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L179

Added line #L179 was not covered by tests
return err
}

if !exists {
c.logger.Error("the table to be closed does not exist", zap.String("schemaName", request.SchemaName), zap.String("tableName", tableName))

Check warning on line 184 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L184

Added line #L184 was not covered by tests
return errors.WithMessagef(ErrTableNotFound, "table not exists, shcemaName:%s,tableName:%s", request.SchemaName, tableName)
}

Expand All @@ -190,12 +190,12 @@
}

if _, err := c.topologyManager.RemoveTable(ctx, request.OldShardID, tableIDs); err != nil {
c.logger.Error("remove table from topology")

Check warning on line 193 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L193

Added line #L193 was not covered by tests
return err
}

if _, err := c.topologyManager.AddTable(ctx, request.NewShardID, tables); err != nil {
c.logger.Error("add table from topology")

Check warning on line 198 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L198

Added line #L198 was not covered by tests
return err
}

Expand Down Expand Up @@ -240,7 +240,7 @@
}

func (c *ClusterMetadata) AddTableTopology(ctx context.Context, shardID storage.ShardID, table storage.Table) (CreateTableResult, error) {
c.logger.Info("add table topology start", zap.String("cluster", c.Name()), zap.String("tableName", table.Name))

Check warning on line 243 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L243

Added line #L243 was not covered by tests

// Add table to topology manager.
result, err := c.topologyManager.AddTable(ctx, shardID, []storage.Table{table})
Expand All @@ -252,7 +252,7 @@
Table: table,
ShardVersionUpdate: result,
}
c.logger.Info("add table topology succeed", zap.String("cluster", c.Name()), zap.String("result", fmt.Sprintf("%+v", ret)))

Check warning on line 255 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L255

Added line #L255 was not covered by tests
return ret, nil
}

Expand Down Expand Up @@ -335,7 +335,7 @@
// TODO: Consider the design of the entire cluster state, which may require refactoring.
if uint32(len(c.registeredNodesCache)) >= c.metaData.MinNodeCount && c.topologyManager.GetClusterState() == storage.ClusterStateEmpty {
if err := c.UpdateClusterView(ctx, storage.ClusterStatePrepare, []storage.ShardNode{}); err != nil {
c.logger.Error("update cluster view failed", zap.Error(err))

Check warning on line 338 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L338

Added line #L338 was not covered by tests
}
}

Expand Down Expand Up @@ -456,22 +456,24 @@
if err != nil {
return RouteTablesResult{}, errors.WithMessage(err, "table manager get table")
}
if exists {
// TODO: Adapt to the current implementation of the partition table, which may need to be reconstructed later.
if !table.IsPartitioned() {
tables[table.ID] = table
tableIDs = append(tableIDs, table.ID)
} else {
routeEntries[table.Name] = RouteEntry{
Table: TableInfo{
ID: table.ID,
Name: table.Name,
SchemaID: table.SchemaID,
SchemaName: schemaName,
PartitionInfo: table.PartitionInfo,
},
NodeShards: nil,
}
if !exists {
continue

Check warning on line 460 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L460

Added line #L460 was not covered by tests
}

// TODO: Adapt to the current implementation of the partition table, which may need to be reconstructed later.
if !table.IsPartitioned() {
tables[table.ID] = table
tableIDs = append(tableIDs, table.ID)
} else {
routeEntries[table.Name] = RouteEntry{
Table: TableInfo{
ID: table.ID,
Name: table.Name,
SchemaID: table.SchemaID,
SchemaName: schemaName,
PartitionInfo: table.PartitionInfo,
},
NodeShards: nil,

Check warning on line 476 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L468-L476

Added lines #L468 - L476 were not covered by tests
}
}
}
Expand All @@ -495,9 +497,9 @@
// If nodeShards length bigger than 1, randomly select a nodeShard.
nodeShardsResult := nodeShards
if len(nodeShards) > 1 {
selectIndex, err := rand.Int(rand.Reader, big.NewInt(int64(len(nodeShards))))
if err != nil {
return RouteTablesResult{}, errors.WithMessage(err, "generate random node index")
selectIndex, err2 := rand.Int(rand.Reader, big.NewInt(int64(len(nodeShards))))
if err2 != nil {
return RouteTablesResult{}, errors.WithMessage(err2, "generate random node index")

Check warning on line 502 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L500-L502

Added lines #L500 - L502 were not covered by tests
}
nodeShardsResult = []ShardNodeWithVersion{nodeShards[selectIndex.Uint64()]}
}
Expand Down Expand Up @@ -583,11 +585,11 @@
return c.metaData.ProcedureExecutingBatchSize
}

func (c *ClusterMetadata) GetCreateTime() uint64 {
c.lock.RLock()
defer c.lock.RUnlock()

return c.metaData.CreatedAt

Check warning on line 592 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L588-L592

Added lines #L588 - L592 were not covered by tests
}

func (c *ClusterMetadata) GetClusterState() storage.ClusterState {
Expand Down Expand Up @@ -637,11 +639,11 @@
}
}

func (c *ClusterMetadata) GetStorageMetadata() storage.Cluster {
c.lock.RLock()
defer c.lock.RUnlock()

return c.metaData

Check warning on line 646 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L642-L646

Added lines #L642 - L646 were not covered by tests
}

// LoadMetadata load cluster metadata from storage.
Expand All @@ -653,8 +655,8 @@
if err != nil {
return errors.WithMessage(err, "get cluster")
}
c.metaData = metadata
return nil

Check warning on line 659 in server/cluster/metadata/cluster_metadata.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/cluster_metadata.go#L658-L659

Added lines #L658 - L659 were not covered by tests
}

// Initialize the cluster view and shard view of the cluster.
Expand Down
7 changes: 4 additions & 3 deletions server/cluster/metadata/cluster_metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,10 @@ func testTableOperation(ctx context.Context, re *require.Assertions, m *metadata
re.True(exists)
re.Equal(testTableName, t.Name)

// Route table should return error when table metadata is not exists in any shard.
_, err = m.RouteTables(ctx, testSchema, []string{testTableName})
re.Error(err)
// Route table return empty when table not assign to any node.
routeTable, err := m.RouteTables(ctx, testSchema, []string{testTableName})
re.NoError(err)
re.Equal(0, len(routeTable.RouteEntries[testTableName].NodeShards))

// Test drop table metadata.
dropMetadataResult, err := m.DropTableMetadata(ctx, testSchema, testTableName)
Expand Down
8 changes: 6 additions & 2 deletions server/cluster/metadata/topology_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,14 +365,18 @@
shardViewVersions := make(map[storage.ShardID]uint64, 0)
for _, tableID := range tableIDs {
shardIDs, ok := m.tableShardMapping[tableID]
// If the table is not assigned to any shard, return an empty slice.
if !ok {
return GetShardNodesByTableIDsResult{}, ErrShardNotFound.WithCausef("table id:%d, not shard is assigned", tableID)
tableShardNodes[tableID] = []storage.ShardNode{}
continue
}

for _, shardID := range shardIDs {
shardNodes, ok := m.shardNodesMapping[shardID]
if !ok {
return GetShardNodesByTableIDsResult{}, ErrNodeNotFound.WithCausef("shard id:%d, no node is assigned", shardID)
// If the shard is not assigned to any node, return an empty slice.
tableShardNodes[tableID] = []storage.ShardNode{}
continue

Check warning on line 379 in server/cluster/metadata/topology_manager.go

View check run for this annotation

Codecov / codecov/patch

server/cluster/metadata/topology_manager.go#L377-L379

Added lines #L377 - L379 were not covered by tests
}

if _, exists := tableShardNodes[tableID]; !exists {
Expand Down
Loading