Skip to content

Commit

Permalink
ddl(ticdc): add charset and collate to ddl event (pingcap#8723) (ping…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot authored Apr 12, 2023
1 parent 416ed67 commit 1f3151f
Show file tree
Hide file tree
Showing 5 changed files with 238 additions and 120 deletions.
8 changes: 8 additions & 0 deletions cdc/model/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,8 @@ type DDLEvent struct {
PreTableInfo *TableInfo `msg:"-"`
Type model.ActionType `msg:"-"`
Done bool `msg:"-"`
Charset string `msg:"-"`
Collate string `msg:"-"`
}

// FromJob fills the values with DDLEvent from DDL job
Expand Down Expand Up @@ -665,6 +667,9 @@ func (d *DDLEvent) FromJob(job *model.Job, preTableInfo *TableInfo, tableInfo *T
d.Type = job.Type
d.PreTableInfo = preTableInfo
d.TableInfo = tableInfo

d.Charset = job.Charset
d.Collate = job.Collate
// rebuild the query if necessary
rebuildQuery()
}
Expand All @@ -687,6 +692,9 @@ func (d *DDLEvent) FromRenameTablesJob(job *model.Job,
d.Type = model.ActionRenameTable
d.PreTableInfo = preTableInfo
d.TableInfo = tableInfo

d.Charset = job.Charset
d.Collate = job.Collate
}

// SingleTableTxn represents a transaction which includes many row events in a single table
Expand Down
23 changes: 18 additions & 5 deletions cdc/owner/ddl_sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ func (s *ddlSinkImpl) run(ctx context.Context) {

case ddl := <-s.ddlCh:
var err error
ddl.Query, err = addSpecialComment(ddl.Query)
ddl.Query, err = s.addSpecialComment(ddl)
if err != nil {
log.Error("Add special comment failed",
zap.String("namespace", s.changefeedID.Namespace),
Expand Down Expand Up @@ -384,13 +384,16 @@ func (s *ddlSinkImpl) isInitialized() bool {
}

// addSpecialComment translate tidb feature to comment
func addSpecialComment(ddlQuery string) (string, error) {
stms, _, err := parser.New().ParseSQL(ddlQuery)
func (s *ddlSinkImpl) addSpecialComment(ddl *model.DDLEvent) (string, error) {
stms, _, err := parser.New().Parse(ddl.Query, ddl.Charset, ddl.Collate)
if err != nil {
return "", errors.Trace(err)
}
if len(stms) != 1 {
log.Panic("invalid ddlQuery statement size", zap.String("ddlQuery", ddlQuery))
log.Panic("invalid ddlQuery statement size",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.String("ddlQuery", ddl.Query))
}
var sb strings.Builder
// translate TiDB feature to special comment
Expand All @@ -406,5 +409,15 @@ func addSpecialComment(ddlQuery string) (string, error) {
if err = stms[0].Restore(format.NewRestoreCtx(restoreFlags, &sb)); err != nil {
return "", errors.Trace(err)
}
return sb.String(), nil

result := sb.String()
log.Info("add special comment to DDL",
zap.String("namespace", s.changefeedID.Namespace),
zap.String("changefeed", s.changefeedID.ID),
zap.String("DDL", ddl.Query),
zap.String("charset", ddl.Charset),
zap.String("collate", ddl.Collate),
zap.String("result", result))

return result, nil
}
Loading

0 comments on commit 1f3151f

Please sign in to comment.