Skip to content

Commit

Permalink
Merge branch 'master' into fix_topo_scalein
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Aug 10, 2022
2 parents d0fe420 + b11f1c4 commit 05a3fcb
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 10 deletions.
41 changes: 41 additions & 0 deletions pkg/cluster/operation/scale_in.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func ScaleInCluster(
cdcInstances := make([]spec.Instance, 0)
// Delete member from cluster
for _, component := range cluster.ComponentsByStartOrder() {
deferInstances := make([]spec.Instance, 0)
for _, instance := range component.Instances() {
if !deletedNodes.Exist(instance.ID()) {
continue
Expand All @@ -274,6 +275,46 @@ func ScaleInCluster(
continue
}

if component.Role() == spec.ComponentPD {
// defer PD leader to be scale-in after others
isLeader, err := instance.(*spec.PDInstance).IsLeader(ctx, cluster, int(options.APITimeout), tlsCfg)
if err != nil {
logger.Warnf("cannot found pd leader, ignore: %s", err)
return err
}
if isLeader {
deferInstances = append(deferInstances, instance)
logger.Debugf("Deferred scale-in of PD leader %s", instance.ID())
continue
}
}

err := deleteMember(ctx, component, instance, pdClient, binlogClient, options.APITimeout)
if err != nil {
return errors.Trace(err)
}

if !asyncOfflineComps.Exist(instance.ComponentName()) {
instCount[instance.GetHost()]--
if err := StopAndDestroyInstance(ctx, cluster, instance, options, false, instCount[instance.GetHost()] == 0, tlsCfg); err != nil {
return err
}
} else {
logger.Warnf(color.YellowString("The component `%s` will become tombstone, maybe exists in several minutes or hours, after that you can use the prune command to clean it",
component.Name()))
}
}

// process deferred instances
for _, instance := range deferInstances {
// actually, it must be the pd leader at the moment, so the `PreRestart` always triggered.
rollingInstance, ok := instance.(spec.RollingUpdateInstance)
if ok {
if err := rollingInstance.PreRestart(ctx, cluster, int(options.APITimeout), tlsCfg); err != nil {
return errors.Trace(err)
}
}

err := deleteMember(ctx, component, instance, pdClient, binlogClient, options.APITimeout)
if err != nil {
return errors.Trace(err)
Expand Down
8 changes: 7 additions & 1 deletion pkg/cluster/operation/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,13 @@ func Upgrade(
return RestartMonitored(ctx, uniqueHosts.Slice(), noAgentHosts, topo.GetMonitoredOptions(), options.OptTimeout)
}

func upgradeInstance(ctx context.Context, topo spec.Topology, instance spec.Instance, options Options, tlsCfg *tls.Config) (err error) {
func upgradeInstance(
ctx context.Context,
topo spec.Topology,
instance spec.Instance,
options Options,
tlsCfg *tls.Config,
) (err error) {
// insert checkpoint
point := checkpoint.Acquire(ctx, upgradePoint, map[string]interface{}{"instance": instance.ID()})
defer func() {
Expand Down
10 changes: 1 addition & 9 deletions pkg/cluster/spec/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,14 +241,7 @@ func (i *CDCInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutS
client := api.NewCDCOpenAPIClient(ctx, []string{address}, 5*time.Second, tlsCfg)
captures, err := client.GetAllCaptures()
if err != nil {
logger.Warnf("cdc pre-restart skipped, cannot get all captures, trigger hard restart, addr: %s, elapsed: %+v", address, time.Since(start))
return nil
}

// this may happen all other captures crashed, only this one alive,
// no need to drain the capture, just return it to trigger hard restart.
if len(captures) <= 1 {
logger.Debugf("cdc pre-restart finished, only one alive capture found, trigger hard restart, addr: %s, elapsed: %+v", address, time.Since(start))
logger.Debugf("cdc pre-restart skipped, cannot get all captures, trigger hard restart, addr: %s, elapsed: %+v", address, time.Since(start))
return nil
}

Expand All @@ -257,7 +250,6 @@ func (i *CDCInstance) PreRestart(ctx context.Context, topo Topology, apiTimeoutS
found bool
isOwner bool
)

for _, capture := range captures {
if address == capture.AdvertiseAddr {
found = true
Expand Down

0 comments on commit 05a3fcb

Please sign in to comment.