Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change replicas to instances #301

Merged
merged 2 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cli/command/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func checkMigrateTopology(curveadm *cli.CurveAdm, data string) error {
dcs2add[0].GetRole() != dcs2del[0].GetRole() {
return errno.ERR_REQUIRE_SAME_ROLE_SERVICES_FOR_MIGRATING
}
if len(dcs2del) != dcs2del[0].GetReplicas() {
if len(dcs2del) != dcs2del[0].GetInstances() {
return errno.ERR_REQUIRE_WHOLE_HOST_SERVICES_FOR_MIGRATING
}

Expand Down
14 changes: 7 additions & 7 deletions cli/command/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,11 @@ var (
)

type statusOptions struct {
id string
role string
host string
verbose bool
showReplicas bool
id string
role string
host string
verbose bool
showInstances bool
}

func NewStatusCommand(curveadm *cli.CurveAdm) *cobra.Command {
Expand All @@ -73,7 +73,7 @@ func NewStatusCommand(curveadm *cli.CurveAdm) *cobra.Command {
flags.StringVar(&options.role, "role", "*", "Specify service role")
flags.StringVar(&options.host, "host", "*", "Specify service host")
flags.BoolVarP(&options.verbose, "verbose", "v", false, "Verbose output for status")
flags.BoolVarP(&options.showReplicas, "show-replicas", "s", false, "Display service replicas")
flags.BoolVarP(&options.showInstances, "show-instances", "s", false, "Display service instances")

return cmd
}
Expand Down Expand Up @@ -113,7 +113,7 @@ func displayStatus(curveadm *cli.CurveAdm, dcs []*topology.DeployConfig, options
}
}

output := tui.FormatStatus(statuses, options.verbose, options.showReplicas)
output := tui.FormatStatus(statuses, options.verbose, options.showInstances)
curveadm.WriteOutln("")
curveadm.WriteOutln("cluster name : %s", curveadm.ClusterName())
curveadm.WriteOutln("cluster kind : %s", dcs[0].GetKind())
Expand Down
6 changes: 3 additions & 3 deletions configs/bs/cluster/scale-out.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
chunkserver_services:
deploy:
- host: ${machine1}
replica: 20 # 请注意这里的replica不代表存储池的副本数,而是节点上同类进程的数量,比如这里指的是chunkserver进程的数量,也就是配置的磁盘数,相关问题可以参考:https://github.com/opencurve/curveadm/issues/146
instances: 20 # 请注意这里的replica不代表存储池的副本数,而是节点上同类进程的数量,比如这里指的是chunkserver进程的数量,也就是配置的磁盘数,相关问题可以参考:https://github.com/opencurve/curveadm/issues/146
config:
- host: ${machine2}
replica: 20
instances: 20
- host: ${machine3}
replica: 20
instances: 20

14 changes: 7 additions & 7 deletions configs/bs/cluster/topology.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
kind: curvebs
global:
container_image: opencurvedocker/curvebs:v1.2
log_dir: ${home}/logs/${service_role}${service_replicas_sequence}
data_dir: ${home}/data/${service_role}${service_replicas_sequence}
log_dir: ${home}/logs/${service_role}${service_instances_sequence}
data_dir: ${home}/data/${service_role}${service_instances_sequence}
s3.nos_address: <>
s3.snapshot_bucket_name: <>
s3.ak: <>
Expand Down Expand Up @@ -36,16 +36,16 @@ mds_services:
chunkserver_services:
config:
listen.ip: ${service_host}
listen.port: 82${format_replicas_sequence} # 8200,8201,8202
data_dir: /data/chunkserver${service_replicas_sequence} # /data/chunkserver0, /data/chunksever1
listen.port: 82${format_instances_sequence} # 8200,8201,8202
data_dir: /data/chunkserver${service_instances_sequence} # /data/chunkserver0, /data/chunksever1
copysets: 100
deploy:
- host: ${machine1}
replicas: 3 # 请注意这里的replica不代表存储池的副本数,而是节点上同类进程的数量,比如这里指的是chunkserver进程的数量,也就是配置的磁盘数,相关问题可以参考:https://github.com/opencurve/curveadm/issues/146
instances: 3
- host: ${machine2}
replicas: 3
instances: 3
- host: ${machine3}
replicas: 3
instances: 3

snapshotclone_services:
config:
Expand Down
14 changes: 7 additions & 7 deletions internal/configure/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ type (
* logicalpools:
* - name: pool1
* physicalpool: pool1
* replicasnum: 3
* replicassum: 3
* copysetnum: 100
* zonenum: 3
* type: 0
Expand All @@ -119,7 +119,7 @@ type (
* ...
* pools:
* - name: pool1
* replicasnum: 3
* replicasum: 3
* copysetnum: 100
* zonenum: 3
*/
Expand All @@ -137,7 +137,7 @@ func SortDeployConfigs(dcs []*topology.DeployConfig) {
dc1, dc2 := dcs[i], dcs[j]
if dc1.GetRole() == dc2.GetRole() {
if dc1.GetHostSequence() == dc2.GetHostSequence() {
return dc1.GetReplicasSequence() < dc2.GetReplicasSequence()
return dc1.GetInstancesSequence() < dc2.GetInstancesSequence()
}
return dc1.GetHostSequence() < dc2.GetHostSequence()
}
Expand All @@ -146,7 +146,7 @@ func SortDeployConfigs(dcs []*topology.DeployConfig) {
}

func formatName(dc *topology.DeployConfig) string {
return fmt.Sprintf("%s_%s_%d", dc.GetHost(), dc.GetName(), dc.GetReplicasSequence())
return fmt.Sprintf("%s_%s_%d", dc.GetHost(), dc.GetName(), dc.GetInstancesSequence())
}

func createLogicalPool(dcs []*topology.DeployConfig, logicalPool string, poolset string) (LogicalPool, []Server) {
Expand All @@ -166,14 +166,14 @@ func createLogicalPool(dcs []*topology.DeployConfig, logicalPool string, poolset
zone = nextZone()
}

// NOTE: if we deploy chunkservers with replica feature
// and the value of replica greater than 1, we should
// NOTE: if we deploy chunkservers with instance feature
// and the value of instance greater than 1, we should
// set internal port and external port to 0 for let MDS
// attribute them as services on the same machine.
// see issue: https://github.com/opencurve/curve/issues/1441
internalPort := dc.GetListenPort()
externalPort := dc.GetListenExternalPort()
if dc.GetReplicas() > 1 {
if dc.GetInstances() > 1 {
internalPort = 0
externalPort = 0
}
Expand Down
54 changes: 27 additions & 27 deletions internal/configure/topology/dc.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,16 @@ const (

type (
DeployConfig struct {
kind string // KIND_CURVEFS / KIND_CUVREBS
id string // role_host_[name/hostSequence]_replicasSequence
parentId string // role_host_[name/hostSequence]_0
role string // etcd/mds/metaserevr/chunkserver
host string
hostname string
name string
replicas int
hostSequence int // start with 0
replicasSequence int // start with 0
kind string // KIND_CURVEFS / KIND_CUVREBS
id string // role_host_[name/hostSequence]_instancesSequence
parentId string // role_host_[name/hostSequence]_0
role string // etcd/mds/metaserevr/chunkserver
host string
hostname string
name string
instances int
hostSequence int // start with 0
instancesSequence int // start with 0

config map[string]interface{}
serviceConfig map[string]string
Expand All @@ -73,8 +73,8 @@ type (
)

// etcd_hostname_0_0
func formatId(role, host, name string, replicasSequence int) string {
return fmt.Sprintf("%s_%s_%s_%d", role, host, name, replicasSequence)
func formatId(role, host, name string, instancesSequence int) string {
return fmt.Sprintf("%s_%s_%s_%d", role, host, name, instancesSequence)
}

func formatName(name string, hostSequence int) string {
Expand Down Expand Up @@ -104,8 +104,8 @@ func newVariables(m map[string]interface{}) (*variable.Variables, error) {
return vars, nil
}

func NewDeployConfig(ctx *Context, kind, role, host, name string, replicas int,
hostSequence, replicasSequence int, config map[string]interface{}) (*DeployConfig, error) {
func NewDeployConfig(ctx *Context, kind, role, host, name string, instances int,
hostSequence, instancesSequence int, config map[string]interface{}) (*DeployConfig, error) {
// variable section
v := config[CONFIG_VARIABLE.key]
if !utils.IsStringAnyMap(v) && v != nil {
Expand Down Expand Up @@ -137,19 +137,19 @@ func NewDeployConfig(ctx *Context, kind, role, host, name string, replicas int,

name = formatName(name, hostSequence)
return &DeployConfig{
kind: kind,
id: formatId(role, host, name, replicasSequence),
parentId: formatId(role, host, name, 0),
role: role,
host: host,
name: name,
replicas: replicas,
hostSequence: hostSequence,
replicasSequence: replicasSequence,
config: config,
serviceConfig: map[string]string{},
variables: vars,
ctx: ctx,
kind: kind,
id: formatId(role, host, name, instancesSequence),
parentId: formatId(role, host, name, 0),
role: role,
host: host,
name: name,
instances: instances,
hostSequence: hostSequence,
instancesSequence: instancesSequence,
config: config,
serviceConfig: map[string]string{},
variables: vars,
ctx: ctx,
}, nil
}

Expand Down
4 changes: 2 additions & 2 deletions internal/configure/topology/dc_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,9 @@ func (dc *DeployConfig) GetRole() string { return dc.role }
func (dc *DeployConfig) GetHost() string { return dc.host }
func (dc *DeployConfig) GetHostname() string { return dc.hostname }
func (dc *DeployConfig) GetName() string { return dc.name }
func (dc *DeployConfig) GetReplicas() int { return dc.replicas }
func (dc *DeployConfig) GetInstances() int { return dc.instances }
func (dc *DeployConfig) GetHostSequence() int { return dc.hostSequence }
func (dc *DeployConfig) GetReplicasSequence() int { return dc.replicasSequence }
func (dc *DeployConfig) GetInstancesSequence() int { return dc.instancesSequence }
func (dc *DeployConfig) GetServiceConfig() map[string]string { return dc.serviceConfig }
func (dc *DeployConfig) GetVariables() *variable.Variables { return dc.variables }

Expand Down
32 changes: 19 additions & 13 deletions internal/configure/topology/topology.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,12 @@ import (

type (
Deploy struct {
Host string `mapstructure:"host"`
Name string `mapstructure:"name"`
Replica int `mapstructure:"replica"` // old version
Replicas int `mapstructure:"replicas"`
Config map[string]interface{} `mapstructure:"config"`
Host string `mapstructure:"host"`
Name string `mapstructure:"name"`
Replica int `mapstructure:"replica"` // old version
Replicas int `mapstructure:"replicas"` // old version
Instances int `mapstructure:"instances"`
Config map[string]interface{} `mapstructure:"config"`
}

Service struct {
Expand Down Expand Up @@ -149,23 +150,28 @@ func ParseTopology(data string, ctx *Context) ([]*DeployConfig, error) {
merge(servicesConfig, deployConfig, 1)

// create deploy config
replicas := 1
instances := 1
if deploy.Replicas < 0 {
return nil, errno.ERR_REPLICAS_REQUIRES_POSITIVE_INTEGER.
return nil, errno.ERR_INSTANCES_REQUIRES_POSITIVE_INTEGER.
F("replicas: %d", deploy.Replicas)
} else if deploy.Replica < 0 {
return nil, errno.ERR_REPLICAS_REQUIRES_POSITIVE_INTEGER.
return nil, errno.ERR_INSTANCES_REQUIRES_POSITIVE_INTEGER.
F("replica: %d", deploy.Replica)
} else if deploy.Instances < 0 {
return nil, errno.ERR_INSTANCES_REQUIRES_POSITIVE_INTEGER.
F("Instance: %d", deploy.Instances)
} else if deploy.Instances > 0 {
instances = deploy.Instances
} else if deploy.Replicas > 0 {
replicas = deploy.Replicas
instances = deploy.Replicas
} else if deploy.Replica > 0 {
replicas = deploy.Replica
instances = deploy.Replica
}

for replicasSequence := 0; replicasSequence < replicas; replicasSequence++ {
for instancesSequence := 0; instancesSequence < instances; instancesSequence++ {
dc, err := NewDeployConfig(ctx, kind,
role, deploy.Host, deploy.Name, replicas,
hostSequence, replicasSequence, utils.DeepCopy(deployConfig))
role, deploy.Host, deploy.Name, instances,
hostSequence, instancesSequence, utils.DeepCopy(deployConfig))
if err != nil {
return nil, err // already is error code
}
Expand Down
22 changes: 14 additions & 8 deletions internal/configure/topology/variables.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ type Var struct {
* ${service_role} "mds"
* ${service_host} "10.0.0.1"
* ${service_host_sequence} "1"
* ${service_replicas_sequence} "1"
* ${format_replicas_sequence} "01"
* ${service_instances_sequence} "1"
* ${format_instances_sequence} "01"
* ${service_addr} "10.0.0.1"
* ${service_port} "6666"
* ${service_client_port} "2379" (etcd)
Expand Down Expand Up @@ -94,8 +94,10 @@ var (
{name: "service_host_sequence"},
{name: "service_replica_sequence"},
{name: "service_replicas_sequence"},
{name: "service_instances_sequence"},
{name: "format_replica_sequence"},
{name: "format_replicas_sequence"},
{name: "format_instances_sequence"},
{name: "service_addr", lookup: true},
{name: "service_port"},
{name: "service_client_port", role: []string{ROLE_ETCD}},
Expand Down Expand Up @@ -174,10 +176,10 @@ func joinEtcdPeer(dcs []*DeployConfig) string {
}

hostSequence := dc.GetHostSequence()
replicaSquence := dc.GetReplicasSequence()
instanceSquence := dc.GetInstancesSequence()
peerHost := dc.GetListenIp()
peerPort := dc.GetListenPort()
peer := fmt.Sprintf("etcd%d%d=http://%s:%d", hostSequence, replicaSquence, peerHost, peerPort)
peer := fmt.Sprintf("etcd%d%d=http://%s:%d", hostSequence, instanceSquence, peerHost, peerPort)
peers = append(peers, peer)
}
return strings.Join(peers, ",")
Expand Down Expand Up @@ -245,13 +247,17 @@ func getValue(name string, dcs []*DeployConfig, idx int) string {
case "service_host_sequence":
return strconv.Itoa(dc.GetHostSequence())
case "service_replica_sequence":
return strconv.Itoa(dc.GetReplicasSequence())
return strconv.Itoa(dc.GetInstancesSequence())
case "service_replicas_sequence":
return strconv.Itoa(dc.GetReplicasSequence())
return strconv.Itoa(dc.GetInstancesSequence())
case "service_instances_sequence":
return strconv.Itoa(dc.GetInstancesSequence())
case "format_replica_sequence":
return fmt.Sprintf("%02d", dc.GetReplicasSequence())
return fmt.Sprintf("%02d", dc.GetInstancesSequence())
case "format_replicas_sequence":
return fmt.Sprintf("%02d", dc.GetReplicasSequence())
return fmt.Sprintf("%02d", dc.GetInstancesSequence())
case "format_instances_sequence":
return fmt.Sprintf("%02d", dc.GetInstancesSequence())
case "service_addr":
return utils.Atoa(dc.get(CONFIG_LISTEN_IP))
case "service_port":
Expand Down
10 changes: 5 additions & 5 deletions internal/errno/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,11 @@ var (
ERR_RENDERING_VARIABLE_FAILED = EC(330007, "rendering variable failed")
ERR_CREATE_HASH_FOR_TOPOLOGY_FAILED = EC(330008, "create hash for topology failed")
// 331: configure (topology.yaml: invalid configure value)
ERR_UNSUPPORT_CLUSTER_KIND = EC(331000, "unsupport cluster kind")
ERR_NO_SERVICES_IN_TOPOLOGY = EC(331001, "no services in topology")
ERR_REPLICAS_REQUIRES_POSITIVE_INTEGER = EC(331002, "replicas requires a positive integer")
ERR_INVALID_VARIABLE_SECTION = EC(331003, "invalid variable section")
ERR_DUPLICATE_SERVICE_ID = EC(331004, "service id is duplicate")
ERR_UNSUPPORT_CLUSTER_KIND = EC(331000, "unsupport cluster kind")
ERR_NO_SERVICES_IN_TOPOLOGY = EC(331001, "no services in topology")
ERR_INSTANCES_REQUIRES_POSITIVE_INTEGER = EC(331002, "instances requires a positive integer")
ERR_INVALID_VARIABLE_SECTION = EC(331003, "invalid variable section")
ERR_DUPLICATE_SERVICE_ID = EC(331004, "service id is duplicate")
// 332: configure (topology.yaml: update topology)
ERR_DELETE_SERVICE_WHILE_COMMIT_TOPOLOGY_IS_DENIED = EC(332000, "delete service while commit topology is denied")
ERR_ADD_SERVICE_WHILE_COMMIT_TOPOLOGY_IS_DENIED = EC(332001, "add service while commit topology is denied")
Expand Down
4 changes: 2 additions & 2 deletions internal/playbook/tasks/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ func (ts *Tasks) displayStatus() func(static decor.Statistics) string {
}
}

func (ts *Tasks) displayReplica(t *task.Task) func(static decor.Statistics) string {
func (ts *Tasks) displayInstance(t *task.Task) func(static decor.Statistics) string {
total := ts.CountPtid(t.Ptid())
return func(static decor.Statistics) string {
nsucc, nskip, _ := ts.monitor.sum(static.ID)
Expand Down Expand Up @@ -149,7 +149,7 @@ func (ts *Tasks) addSubBar(t *task.Task) {
mpb.PrependDecorators(
decor.Name(" + "),
decor.Name(t.Subname()+" "),
decor.Any(ts.displayReplica(t), decor.WCSyncWidthR),
decor.Any(ts.displayInstance(t), decor.WCSyncWidthR),
decor.Name(" "),
decor.OnComplete(decor.Spinner([]string{}), ""),
decor.Any(ts.displayStatus()),
Expand Down
2 changes: 1 addition & 1 deletion internal/task/task/checker/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func joinPorts(dc *topology.DeployConfig, addresses []Address) string {
for _, address := range addresses {
ports = append(ports, strconv.Itoa(address.Port))
}
if dc.GetReplicas() > 1 { // replicas service
if dc.GetInstances() > 1 { // instances service
ports = append(ports, "...")
}
return strings.Join(ports, ",")
Expand Down
Loading