Skip to content
This repository has been archived by the owner on Feb 6, 2024. It is now read-only.

Commit

Permalink
feat: support manipulate the shard affinities
Browse files Browse the repository at this point in the history
  • Loading branch information
ShiKaiWi committed Oct 7, 2023
1 parent a1a3483 commit f25bf8e
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
31 changes: 29 additions & 2 deletions server/coordinator/scheduler/rebalanced_shard_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
"fmt"
"strings"
"sync"

"github.com/CeresDB/ceresmeta/pkg/assert"
"github.com/CeresDB/ceresmeta/server/cluster/metadata"
Expand All @@ -20,18 +21,44 @@ type RebalancedShardScheduler struct {
factory *coordinator.Factory
nodePicker coordinator.NodePicker
procedureExecutingBatchSize uint32

// `mu` protect the following fields.
mu sync.Mutex
shardAffinityRule map[storage.ShardID]ShardAffinity
}

func NewRebalancedShardScheduler(logger *zap.Logger, factory *coordinator.Factory, nodePicker coordinator.NodePicker, procedureExecutingBatchSize uint32) Scheduler {
return RebalancedShardScheduler{
return &RebalancedShardScheduler{
logger: logger,
factory: factory,
nodePicker: nodePicker,
procedureExecutingBatchSize: procedureExecutingBatchSize,
mu: sync.Mutex{},
shardAffinityRule: make(map[storage.ShardID]ShardAffinity),
}
}

func (r *RebalancedShardScheduler) AddShardAffinityRule(ctx context.Context, rule ShardAffinityRule) error {
r.mu.Lock()
defer r.mu.Unlock()

for _, shardAffinity := range rule.Affinities {
r.shardAffinityRule[shardAffinity.ShardID] = shardAffinity
}

return nil
}

func (r *RebalancedShardScheduler) RemoveShardAffinityRule(ctx context.Context, shardID storage.ShardID) error {
r.mu.Lock()
defer r.mu.Unlock()

delete(r.shardAffinityRule, shardID)

return nil
}

func (r RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) {
func (r *RebalancedShardScheduler) Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error) {
// RebalancedShardScheduler can only be scheduled when the cluster is not empty.
if clusterSnapshot.Topology.ClusterView.State == storage.ClusterStateEmpty {
return ScheduleResult{}, nil
Expand Down
12 changes: 12 additions & 0 deletions server/coordinator/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/CeresDB/ceresmeta/server/cluster/metadata"
"github.com/CeresDB/ceresmeta/server/coordinator/procedure"
"github.com/CeresDB/ceresmeta/server/storage"
)

type ScheduleResult struct {
Expand All @@ -15,7 +16,18 @@ type ScheduleResult struct {
Reason string
}

type ShardAffinity struct {
ShardID storage.ShardID `json:"shardID"`
NumAllowedOtherShards uint `json:"numAllowedOtherShards"`
}

type ShardAffinityRule struct {
Affinities []ShardAffinity
}

type Scheduler interface {
// Schedule will generate procedure based on current cluster snapshot, which will be submitted to ProcedureManager, and whether it is actually executed depends on the current state of ProcedureManager.
Schedule(ctx context.Context, clusterSnapshot metadata.Snapshot) (ScheduleResult, error)
AddShardAffinityRule(ctx context.Context, rule ShardAffinityRule) error
RemoveShardAffinityRule(ctx context.Context, shard_id storage.ShardID) error
}

0 comments on commit f25bf8e

Please sign in to comment.