This repository has been archived by the owner on Nov 24, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 188
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat: upgrade v1.0.x metadata tables (#858)
- Loading branch information
1 parent
734f65e
commit d71b25d
Showing
11 changed files
with
676 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,136 @@ | ||
// Copyright 2020 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package reader | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
|
||
"github.com/pingcap/errors" | ||
"github.com/pingcap/parser" | ||
uuid "github.com/satori/go.uuid" | ||
gmysql "github.com/siddontang/go-mysql/mysql" | ||
"github.com/siddontang/go-mysql/replication" | ||
|
||
"github.com/pingcap/dm/pkg/binlog/event" | ||
"github.com/pingcap/dm/pkg/gtid" | ||
"github.com/pingcap/dm/pkg/terror" | ||
"github.com/pingcap/dm/relay/common" | ||
) | ||
|
||
// GetGTIDsForPos tries to get GTID sets for the specified binlog position (for the corresponding txn). | ||
// NOTE: this method is very similar with `relay/writer/file_util.go/getTxnPosGTIDs`, unify them if needed later. | ||
// NOTE: this method is not well tested directly, but more tests have already been done for `relay/writer/file_util.go/getTxnPosGTIDs`. | ||
func GetGTIDsForPos(ctx context.Context, r Reader, endPos gmysql.Position, parser2 *parser.Parser) (gtid.Set, error) { | ||
// start to get and parse binlog event from the beginning of the file. | ||
startPos := gmysql.Position{ | ||
Name: endPos.Name, | ||
Pos: 0, | ||
} | ||
err := r.StartSyncByPos(startPos) | ||
if err != nil { | ||
return nil, err | ||
} | ||
defer r.Close() | ||
|
||
var ( | ||
flavor string | ||
latestPos uint32 | ||
latestGSet gmysql.GTIDSet | ||
nextGTIDStr string // can be recorded if the coming transaction completed | ||
) | ||
for { | ||
var e *replication.BinlogEvent | ||
e, err = r.GetEvent(ctx) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
// NOTE: only update endPos/GTIDs for DDL/XID to get an complete transaction. | ||
switch ev := e.Event.(type) { | ||
case *replication.QueryEvent: | ||
isDDL := common.CheckIsDDL(string(ev.Query), parser2) | ||
if isDDL { | ||
if latestGSet == nil { | ||
// GTID not enabled, can't get GTIDs for the position. | ||
return nil, errors.Errorf("should have a GTIDEvent before the DDL QueryEvent %+v", e.Header) | ||
} | ||
err = latestGSet.Update(nextGTIDStr) | ||
if err != nil { | ||
return nil, terror.Annotatef(err, "update GTID set %v with GTID %s", latestGSet, nextGTIDStr) | ||
} | ||
latestPos = e.Header.LogPos | ||
} | ||
case *replication.XIDEvent: | ||
if latestGSet == nil { | ||
// GTID not enabled, can't get GTIDs for the position. | ||
return nil, errors.Errorf("should have a GTIDEvent before the XIDEvent %+v", e.Header) | ||
} | ||
err = latestGSet.Update(nextGTIDStr) | ||
if err != nil { | ||
return nil, terror.Annotatef(err, "update GTID set %v with GTID %s", latestGSet, nextGTIDStr) | ||
} | ||
latestPos = e.Header.LogPos | ||
case *replication.GTIDEvent: | ||
if latestGSet == nil { | ||
return nil, errors.Errorf("should have a PreviousGTIDsEvent before the GTIDEvent %+v", e.Header) | ||
} | ||
// learn from: https://github.com/siddontang/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L736 | ||
u, _ := uuid.FromBytes(ev.SID) | ||
nextGTIDStr = fmt.Sprintf("%s:%d", u.String(), ev.GNO) | ||
case *replication.MariadbGTIDEvent: | ||
if latestGSet == nil { | ||
return nil, errors.Errorf("should have a MariadbGTIDListEvent before the MariadbGTIDEvent %+v", e.Header) | ||
} | ||
// learn from: https://github.com/siddontang/go-mysql/blob/c6ab05a85eb86dc51a27ceed6d2f366a32874a24/replication/binlogsyncer.go#L745 | ||
GTID := ev.GTID | ||
nextGTIDStr = fmt.Sprintf("%d-%d-%d", GTID.DomainID, GTID.ServerID, GTID.SequenceNumber) | ||
case *replication.PreviousGTIDsEvent: | ||
// if GTID enabled, we can get a PreviousGTIDEvent after the FormatDescriptionEvent | ||
// ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L4549 | ||
// ref: https://github.com/mysql/mysql-server/blob/8cc757da3d87bf4a1f07dcfb2d3c96fed3806870/sql/binlog.cc#L5161 | ||
var gSet gtid.Set | ||
gSet, err = gtid.ParserGTID(gmysql.MySQLFlavor, ev.GTIDSets) | ||
if err != nil { | ||
return nil, err | ||
} | ||
latestGSet = gSet.Origin() | ||
flavor = gmysql.MySQLFlavor | ||
case *replication.MariadbGTIDListEvent: | ||
// a MariadbGTIDListEvent logged in every binlog to record the current replication state if GTID enabled | ||
// ref: https://mariadb.com/kb/en/library/gtid_list_event/ | ||
gSet, err2 := event.GTIDsFromMariaDBGTIDListEvent(e) | ||
if err2 != nil { | ||
return nil, terror.Annotatef(err2, "get GTID set from MariadbGTIDListEvent %+v", e.Header) | ||
} | ||
latestGSet = gSet.Origin() | ||
flavor = gmysql.MariaDBFlavor | ||
} | ||
|
||
if latestPos == endPos.Pos { | ||
// reach the end position, return the GTID sets. | ||
if latestGSet == nil { | ||
return nil, errors.Errorf("no GTIDs get for position %s", endPos) | ||
} | ||
var latestGTIDs gtid.Set | ||
latestGTIDs, err = gtid.ParserGTID(flavor, latestGSet.String()) | ||
if err != nil { | ||
return nil, terror.Annotatef(err, "parse GTID set %s with flavor %s", latestGSet.String(), flavor) | ||
} | ||
return latestGTIDs, nil | ||
} else if latestPos > endPos.Pos { | ||
return nil, errors.Errorf("invalid position %s or GTID not enabled in upstream", endPos) | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
// Copyright 2020 PingCAP, Inc. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package reader | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
. "github.com/pingcap/check" | ||
gmysql "github.com/siddontang/go-mysql/mysql" | ||
"github.com/siddontang/go-mysql/replication" | ||
|
||
"github.com/pingcap/dm/pkg/utils" | ||
) | ||
|
||
// added to testTCPReaderSuite to re-use DB connection. | ||
func (t *testTCPReaderSuite) TestGetGTIDsForPos(c *C) { | ||
var ( | ||
cfg = replication.BinlogSyncerConfig{ | ||
ServerID: serverIDs[1], | ||
Flavor: flavor, | ||
Host: t.host, | ||
Port: uint16(t.port), | ||
User: t.user, | ||
Password: t.password, | ||
UseDecimal: true, | ||
VerifyChecksum: true, | ||
} | ||
ctx, cancel = context.WithTimeout(context.Background(), 30*time.Second) | ||
) | ||
defer cancel() | ||
|
||
endPos, endGS, err := utils.GetMasterStatus(t.db, flavor) | ||
c.Assert(err, IsNil) | ||
|
||
parser2, err := utils.GetParser(t.db, false) | ||
c.Assert(err, IsNil) | ||
|
||
r1 := NewTCPReader(cfg) | ||
c.Assert(r1, NotNil) | ||
defer r1.Close() | ||
|
||
gs, err := GetGTIDsForPos(ctx, r1, endPos, parser2) | ||
c.Assert(err, IsNil) | ||
c.Assert(gs.Equal(endGS), IsTrue) | ||
|
||
// try to get for an invalid pos. | ||
r2 := NewTCPReader(cfg) | ||
c.Assert(r2, NotNil) | ||
defer r2.Close() | ||
gs, err = GetGTIDsForPos(ctx, r2, gmysql.Position{ | ||
Name: endPos.Name, | ||
Pos: endPos.Pos - 1, | ||
}, parser2) | ||
c.Assert(err, ErrorMatches, ".*invalid position .* or GTID not enabled in upstream.*") | ||
c.Assert(gs, IsNil) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.