diff --git a/pkg/cluster/operation/upgrade.go b/pkg/cluster/operation/upgrade.go index 56643b0944..12a4115414 100644 --- a/pkg/cluster/operation/upgrade.go +++ b/pkg/cluster/operation/upgrade.go @@ -52,11 +52,36 @@ func Upgrade( continue } - // Transfer leader of evict leader if the component is TiKV/PD in non-force mode + log.Infof("Upgrading component %s", component.Name()) - log.Infof("Restarting component %s", component.Name()) + // some instances are upgraded after others + deferInstances := make([]spec.Instance, 0) for _, instance := range instances { + switch component.Name() { + case spec.ComponentPD: + // defer PD leader to be upgraded after others + isLeader, err := instance.(*spec.PDInstance).IsLeader(topo, int(options.APITimeout), tlsCfg) + if err != nil { + return err + } + if isLeader { + deferInstances = append(deferInstances, instance) + log.Debugf("Defferred upgrading of PD leader %s", instance.ID()) + continue + } + default: + // do nothing, kept for future usage with other components + } + + if err := upgradeInstance(ctx, topo, instance, options, tlsCfg); err != nil { + return err + } + } + + // process defferred instances + for _, instance := range deferInstances { + log.Debugf("Upgrading defferred instance %s...", instance.ID()) if err := upgradeInstance(ctx, topo, instance, options, tlsCfg); err != nil { return err } diff --git a/pkg/cluster/spec/pd.go b/pkg/cluster/spec/pd.go index 0b35a3b795..ab5031d992 100644 --- a/pkg/cluster/spec/pd.go +++ b/pkg/cluster/spec/pd.go @@ -289,6 +289,26 @@ func (i *PDInstance) ScaleConfig( var _ RollingUpdateInstance = &PDInstance{} +// IsLeader checks if the instance is PD leader +func (i *PDInstance) IsLeader(topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) (bool, error) { + tidbTopo, ok := topo.(*Specification) + if !ok { + panic("topo should be type of tidb topology") + } + pdClient := api.NewPDClient(tidbTopo.GetPDList(), time.Second*5, tlsCfg) + + return i.isLeader(pdClient) +} + +func (i *PDInstance) isLeader(pdClient *api.PDClient) (bool, error) { + leader, err := pdClient.GetLeader() + if err != nil { + return false, errors.Annotatef(err, "failed to get PD leader %s", i.GetHost()) + } + + return leader.Name == i.Name, nil +} + // PreRestart implements RollingUpdateInstance interface. func (i *PDInstance) PreRestart(topo Topology, apiTimeoutSeconds int, tlsCfg *tls.Config) error { timeoutOpt := &utils.RetryOption{ @@ -300,15 +320,13 @@ func (i *PDInstance) PreRestart(topo Topology, apiTimeoutSeconds int, tlsCfg *tl if !ok { panic("topo should be type of tidb topology") } + pdClient := api.NewPDClient(tidbTopo.GetPDList(), time.Second*5, tlsCfg) - pdClient := api.NewPDClient(tidbTopo.GetPDList(), 5*time.Second, tlsCfg) - - leader, err := pdClient.GetLeader() + isLeader, err := i.isLeader(pdClient) if err != nil { - return errors.Annotatef(err, "failed to get PD leader %s", i.GetHost()) + return err } - - if len(tidbTopo.PDServers) > 1 && leader.Name == i.Name { + if len(tidbTopo.PDServers) > 1 && isLeader { if err := pdClient.EvictPDLeader(timeoutOpt); err != nil { return errors.Annotatef(err, "failed to evict PD leader %s", i.GetHost()) }