From a3fff4228946c2a792bfa6f3383151d0efd9a701 Mon Sep 17 00:00:00 2001 From: wjHuang Date: Fri, 11 Nov 2022 13:05:53 +0800 Subject: [PATCH] ddl: refine logic of OwnerCheckAllVersions (#39070) --- ddl/syncer/syncer.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/ddl/syncer/syncer.go b/ddl/syncer/syncer.go index b2285351f83ae..ff7eaaa446893 100644 --- a/ddl/syncer/syncer.go +++ b/ddl/syncer/syncer.go @@ -262,7 +262,7 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i // If MDL is enabled, updatedMap is used to check if all the servers report the least version. // updatedMap is initialed to record all the server in every loop. We delete a server from the map if it gets the metadata lock(the key version equal the given version. // updatedMap should be empty if all the servers get the metadata lock. - updatedMap := make(map[string]struct{}) + updatedMap := make(map[string]string) for { if util.IsContextDone(ctx) { // ctx is canceled or timeout. @@ -278,9 +278,23 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i if err != nil { return err } - updatedMap = make(map[string]struct{}) + updatedMap = make(map[string]string) + instance2id := make(map[string]string) + + // Set updatedMap according to the serverInfos, and remove some invalid serverInfos. for _, info := range serverInfos { - updatedMap[info.ID] = struct{}{} + instance := fmt.Sprintf("%s:%d", info.IP, info.Port) + if id, ok := instance2id[instance]; ok { + if info.StartTimestamp > serverInfos[id].StartTimestamp { + // Replace it. + delete(updatedMap, id) + updatedMap[info.ID] = fmt.Sprintf("instance ip %s, port %d, id %s", info.IP, info.Port, info.ID) + instance2id[instance] = info.ID + } + } else { + updatedMap[info.ID] = fmt.Sprintf("instance ip %s, port %d, id %s", info.IP, info.Port, info.ID) + instance2id[instance] = info.ID + } } } @@ -315,6 +329,9 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i } if len(updatedMap) > 0 { succ = false + for _, info := range updatedMap { + logutil.BgLogger().Info("[ddl] syncer check all versions, someone is not synced", zap.String("info", info), zap.Any("ddl id", jobID), zap.Any("ver", latestVer)) + } } } else { for _, kv := range resp.Kvs { @@ -337,7 +354,7 @@ func (s *schemaVersionSyncer) OwnerCheckAllVersions(ctx context.Context, jobID i notMatchVerCnt++ break } - updatedMap[string(kv.Key)] = struct{}{} + updatedMap[string(kv.Key)] = "" } }