Skip to content

Commit

Permalink
dm: parse metadata without flavor (pingcap#10155)
Browse files Browse the repository at this point in the history
  • Loading branch information
GMHDBJD authored and lichunzhu committed Dec 25, 2023
1 parent 36fb585 commit e930150
Show file tree
Hide file tree
Showing 7 changed files with 17 additions and 17 deletions.
2 changes: 1 addition & 1 deletion dm/loader/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ func getMydumpMetadata(ctx context.Context, cli *clientv3.Client, cfg *config.Su
log.L().Warn("TestRemoveMetaFile Error", log.ShortError(err))
}
})
loc, _, err := dumpling.ParseMetaData(ctx, cfg.LoaderConfig.Dir, metafile, cfg.Flavor, cfg.ExtStorage)
loc, _, err := dumpling.ParseMetaData(ctx, cfg.LoaderConfig.Dir, metafile, cfg.ExtStorage)
if err == nil {
return loc.Position.String(), loc.GTIDSetStr(), nil
}
Expand Down
15 changes: 7 additions & 8 deletions dm/pkg/dumpling/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func ParseMetaData(
ctx context.Context,
dir string,
filename string,
flavor string,
extStorage brstorage.ExternalStorage,
) (*binlog.Location, *binlog.Location, error) {
fd, err := storage.OpenFile(ctx, dir, filename, extStorage)
Expand All @@ -54,11 +53,11 @@ func ParseMetaData(
}
defer fd.Close()

return parseMetaDataByReader(filename, flavor, fd)
return parseMetaDataByReader(filename, fd)
}

// ParseMetaData parses mydumper's output meta file by created reader and returns binlog location.
func parseMetaDataByReader(filename, flavor string, rd io.Reader) (*binlog.Location, *binlog.Location, error) {
func parseMetaDataByReader(filename string, rd io.Reader) (*binlog.Location, *binlog.Location, error) {
invalidErr := fmt.Errorf("file %s invalid format", filename)

var (
Expand Down Expand Up @@ -101,7 +100,7 @@ func parseMetaDataByReader(filename, flavor string, rd io.Reader) (*binlog.Locat
pos.Pos = uint32(pos64)
case "GTID":
// multiple GTID sets may cross multiple lines, continue to read them.
following, err3 := readFollowingGTIDs(br, flavor)
following, err3 := readFollowingGTIDs(br)
if err3 != nil {
return err3
}
Expand Down Expand Up @@ -154,7 +153,7 @@ func parseMetaDataByReader(filename, flavor string, rd io.Reader) (*binlog.Locat
return nil, nil, terror.ErrMetadataNoBinlogLoc.Generate(filename)
}

gset, err := gtid.ParserGTID(flavor, gtidStr)
gset, err := gtid.ParserGTID("", gtidStr)
if err != nil {
return nil, nil, invalidErr
}
Expand All @@ -165,7 +164,7 @@ func parseMetaDataByReader(filename, flavor string, rd io.Reader) (*binlog.Locat
if len(pos2.Name) == 0 || pos2.Pos == uint32(0) {
return nil, nil, invalidErr
}
gset2, err := gtid.ParserGTID(flavor, gtidStr2)
gset2, err := gtid.ParserGTID("", gtidStr2)
if err != nil {
return nil, nil, invalidErr
}
Expand All @@ -176,7 +175,7 @@ func parseMetaDataByReader(filename, flavor string, rd io.Reader) (*binlog.Locat
return locPtr, locPtr2, nil
}

func readFollowingGTIDs(br *bufio.Reader, flavor string) (string, error) {
func readFollowingGTIDs(br *bufio.Reader) (string, error) {
var following strings.Builder
for {
line, err := br.ReadString('\n')
Expand All @@ -197,7 +196,7 @@ func readFollowingGTIDs(br *bufio.Reader, flavor string) (string, error) {
}

// try parse to verify it
_, err = gtid.ParserGTID(flavor, line[:end])
_, err = gtid.ParserGTID("", line[:end])
if err != nil {
// nolint:nilerr
return following.String(), nil // return the previous, not including this non-GTID line.
Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/dumpling/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ Finished dump at: 2020-09-30 12:16:49
for _, tc := range testCases {
err2 := os.WriteFile(f.Name(), []byte(tc.source), 0o644)
require.NoError(t, err2)
loc, loc2, err2 := ParseMetaData(ctx, fdir, fname, "mysql", nil)
loc, loc2, err2 := ParseMetaData(ctx, fdir, fname, nil)
require.NoError(t, err2)
require.Equal(t, tc.pos, loc.Position)
gs, _ := gtid.ParserGTID("mysql", tc.gsetStr)
Expand All @@ -251,7 +251,7 @@ Finished dump at: 2020-12-02 17:13:56
`
err = os.WriteFile(f.Name(), []byte(noBinlogLoc), 0o644)
require.NoError(t, err)
_, _, err = ParseMetaData(ctx, fdir, fname, "mysql", nil)
_, _, err = ParseMetaData(ctx, fdir, fname, nil)
require.True(t, terror.ErrMetadataNoBinlogLoc.Equal(err))
}

Expand Down
4 changes: 2 additions & 2 deletions dm/pkg/gtid/gtid.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"strings"

"github.com/go-mysql-org/go-mysql/mysql"
"github.com/pingcap/errors"
"github.com/pingcap/tiflow/dm/pkg/terror"
)

Expand All @@ -30,7 +29,8 @@ func ParserGTID(flavor, gtidStr string) (mysql.GTIDSet, error) {
)

if len(flavor) == 0 && len(gtidStr) == 0 {
return nil, errors.Errorf("empty flavor with empty gtid is invalid")
// regard as mysql, mariadb always enabled gtid
return mysql.ParseGTIDSet(mysql.MySQLFlavor, "")
}

fla := flavor
Expand Down
5 changes: 3 additions & 2 deletions dm/pkg/gtid/gtid_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,9 @@ func TestParseGTIDNoFlavor(t *testing.T) {
require.NoError(t, err)
require.IsType(t, &mysql.MariadbGTIDSet{}, gset)

_, err = ParserGTID("", "")
require.Error(t, err)
gset, err = ParserGTID("", "")
require.NoError(t, err)
require.IsType(t, &mysql.MysqlGTIDSet{}, gset)
}

func TestIsNilGTIDSet(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1302,7 +1302,7 @@ func (cp *RemoteCheckPoint) genUpdateSQL(cpSchema, cpTable string, location binl
func (cp *RemoteCheckPoint) parseMetaData(ctx context.Context) (*binlog.Location, *binlog.Location, error) {
// `metadata` is mydumper's output meta file name
filename := "metadata"
loc, loc2, err := dumpling.ParseMetaData(ctx, cp.cfg.LoaderConfig.Dir, filename, cp.cfg.Flavor, cp.cfg.ExtStorage)
loc, loc2, err := dumpling.ParseMetaData(ctx, cp.cfg.LoaderConfig.Dir, filename, cp.cfg.ExtStorage)
if err != nil {
toPrint, err2 := storage.ReadFile(ctx, cp.cfg.LoaderConfig.Dir, filename, nil)
if err2 != nil {
Expand Down
2 changes: 1 addition & 1 deletion dm/syncer/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2964,7 +2964,7 @@ func (s *Syncer) loadTableStructureFromDump(ctx context.Context) error {
zap.String("db", db),
zap.String("path", s.cfg.LoaderConfig.Dir),
zap.String("file", file),
zap.Error(err))
zap.Error(err2))
setFirstErr(err2)
continue
}
Expand Down

0 comments on commit e930150

Please sign in to comment.