Skip to content

Commit

Permalink
add scale support for meta (#146)
Browse files Browse the repository at this point in the history
Signed-off-by: sh2 <[email protected]>
  • Loading branch information
shawnh2 authored Sep 1, 2023
1 parent 05d9130 commit 7078dae
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 62 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/gtctl/cluster/get/get.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ func getClusterFromKubernetes(ctx context.Context, l logger.Logger, nn types.Nam
func getClusterFromBareMetal(ctx context.Context, l logger.Logger, nn types.NamespacedName, table *tablewriter.Table) error {
deployer, err := baremetal.NewDeployer(l, nn.Name, baremetal.WithCreateNoDirs())
if err != nil {
return nil
return err
}

cluster, err := deployer.GetGreptimeDBCluster(ctx, nn.Name, nil)
Expand Down
130 changes: 77 additions & 53 deletions pkg/cmd/gtctl/cluster/scale/scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package scale
import (
"context"
"fmt"
"time"

greptimedbclusterv1alpha1 "github.com/GreptimeTeam/greptimedb-operator/apis/v1alpha1"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -47,77 +48,100 @@ func NewScaleClusterCommand(l logger.Logger) *cobra.Command {
return fmt.Errorf("cluster name should be set")
}

if options.ComponentType == "" {
return fmt.Errorf("component type is required")
}

if options.ComponentType != string(greptimedbclusterv1alpha1.FrontendComponentKind) &&
options.ComponentType != string(greptimedbclusterv1alpha1.DatanodeComponentKind) {
return fmt.Errorf("component type is invalid")
}

if options.Replicas < 1 {
return fmt.Errorf("replicas should be equal or greater than 1")
}

k8sDeployer, err := k8s.NewDeployer(l)
if err != nil {
if err := validateScaleOptions(options); err != nil {
return err
}

var (
ctx = context.TODO()
clusterName = args[0]
namespace = options.Namespace
ctx = context.Background()
nn = types.NamespacedName{
Namespace: options.Namespace,
Name: args[0],
}
cancel context.CancelFunc
)

name := types.NamespacedName{
Namespace: options.Namespace,
Name: clusterName,
}.String()
cluster, err := k8sDeployer.GetGreptimeDBCluster(ctx, name, nil)
if err != nil && errors.IsNotFound(err) {
l.Errorf("cluster %s in %s not found\n", clusterName, namespace)
return nil
}
if err != nil {
return err
}

rawCluster, ok := cluster.Raw.(*greptimedbclusterv1alpha1.GreptimeDBCluster)
if !ok {
return fmt.Errorf("invalid cluster type")
if options.Timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, time.Duration(options.Timeout)*time.Second)
defer cancel()
}

var oldReplicas int32
if options.ComponentType == string(greptimedbclusterv1alpha1.FrontendComponentKind) {
oldReplicas = rawCluster.Spec.Frontend.Replicas
rawCluster.Spec.Frontend.Replicas = options.Replicas
}

if options.ComponentType == string(greptimedbclusterv1alpha1.DatanodeComponentKind) {
oldReplicas = rawCluster.Spec.Datanode.Replicas
rawCluster.Spec.Datanode.Replicas = options.Replicas
}

l.V(0).Infof("Scaling cluster %s in %s... from %d to %d\n", clusterName, namespace, oldReplicas, options.Replicas)

if err := k8sDeployer.UpdateGreptimeDBCluster(ctx, name, &deployer.UpdateGreptimeDBClusterOptions{
NewCluster: &deployer.GreptimeDBCluster{Raw: rawCluster},
}); err != nil {
if err := scaleClusterForKubernetes(ctx, options, l, nn); err != nil {
return err
}

l.V(0).Infof("Scaling cluster %s in %s is OK!\n", clusterName, namespace)

return nil
},
}

cmd.Flags().StringVarP(&options.ComponentType, "component-type", "c", "", "Component of GreptimeDB cluster, can be 'frontend' and 'datanode'.")
cmd.Flags().StringVarP(&options.ComponentType, "component", "c", "", "Component of GreptimeDB cluster, can be 'frontend', 'datanode' and 'meta'.")
cmd.Flags().StringVarP(&options.Namespace, "namespace", "n", "default", "Namespace of GreptimeDB cluster.")
cmd.Flags().Int32Var(&options.Replicas, "replicas", 0, "The replicas of component of GreptimeDB cluster.")
cmd.Flags().IntVar(&options.Timeout, "timeout", -1, "Timeout in seconds for the command to complete, default is no timeout.")

return cmd
}

func validateScaleOptions(options scaleCliOptions) error {
if options.ComponentType == "" {
return fmt.Errorf("component type is required")
}

if options.ComponentType != string(greptimedbclusterv1alpha1.FrontendComponentKind) &&
options.ComponentType != string(greptimedbclusterv1alpha1.DatanodeComponentKind) &&
options.ComponentType != string(greptimedbclusterv1alpha1.MetaComponentKind) {
return fmt.Errorf("component type is invalid")
}

if options.Replicas < 1 {
return fmt.Errorf("replicas should be equal or greater than 1")
}

return nil
}

func scaleClusterForKubernetes(ctx context.Context, options scaleCliOptions, l logger.Logger, nn types.NamespacedName) error {
k8sDeployer, err := k8s.NewDeployer(l)
if err != nil {
return err
}

cluster, err := k8sDeployer.GetGreptimeDBCluster(ctx, nn.String(), nil)
if err != nil && errors.IsNotFound(err) {
l.Errorf("cluster %s in %s not found\n", nn.Name, nn.Namespace)
return nil
}
if err != nil {
return err
}

rawCluster, ok := cluster.Raw.(*greptimedbclusterv1alpha1.GreptimeDBCluster)
if !ok {
return fmt.Errorf("invalid cluster type")
}

var oldReplicas int32
switch options.ComponentType {
case string(greptimedbclusterv1alpha1.FrontendComponentKind):
oldReplicas = rawCluster.Spec.Frontend.Replicas
rawCluster.Spec.Frontend.Replicas = options.Replicas
case string(greptimedbclusterv1alpha1.DatanodeComponentKind):
oldReplicas = rawCluster.Spec.Datanode.Replicas
rawCluster.Spec.Datanode.Replicas = options.Replicas
case string(greptimedbclusterv1alpha1.MetaComponentKind):
oldReplicas = rawCluster.Spec.Meta.Replicas
rawCluster.Spec.Meta.Replicas = options.Replicas
}

l.V(0).Infof("Scaling cluster %s in %s... from %d to %d\n", nn.Name, nn.Namespace, oldReplicas, options.Replicas)

if err = k8sDeployer.UpdateGreptimeDBCluster(ctx, nn.String(), &deployer.UpdateGreptimeDBClusterOptions{
NewCluster: &deployer.GreptimeDBCluster{Raw: rawCluster},
}); err != nil {
return err
}

l.V(0).Infof("Scaling cluster %s in %s is OK!\n", nn.Name, nn.Namespace)

return nil
}
5 changes: 2 additions & 3 deletions pkg/deployer/baremetal/component/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,8 @@ func runBinary(ctx context.Context, option *RunOptions, wg *sync.WaitGroup, logg
if err := cmd.Wait(); err != nil {
// Caught signal kill and interrupt error then ignore.
if exit, ok := err.(*exec.ExitError); ok {
if status, ok := exit.Sys().(syscall.WaitStatus); ok {
if status.Signaled() &&
(status.Signal() == syscall.SIGKILL || status.Signal() == syscall.SIGINT) {
if status, ok := exit.Sys().(syscall.WaitStatus); ok && status.Signaled() {
if status.Signal() == syscall.SIGKILL || status.Signal() == syscall.SIGINT {
return
}
}
Expand Down
5 changes: 3 additions & 2 deletions pkg/deployer/baremetal/config/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ const (
type MetaConfig struct {
*Config

CreationDate time.Time `yaml:"creationDate"`
ClusterDir string `yaml:"clusterDir"`
CreationDate time.Time `yaml:"creationDate"`
ClusterDir string `yaml:"clusterDir"`
ForegroundPid int `yaml:"foregroundPid"`
}

// Config is the desired state of a GreptimeDB cluster on bare metal.
Expand Down
7 changes: 4 additions & 3 deletions pkg/deployer/baremetal/deployer.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,10 @@ func (d *Deployer) createClusterConfigFile() error {
}

metaConfig := config.MetaConfig{
Config: d.config,
CreationDate: time.Now(),
ClusterDir: d.clusterDir,
Config: d.config,
CreationDate: time.Now(),
ClusterDir: d.clusterDir,
ForegroundPid: os.Getpid(),
}

out, err := yaml.Marshal(metaConfig)
Expand Down

0 comments on commit 7078dae

Please sign in to comment.