Skip to content

Commit

Permalink
enable charset_gbk
Browse files Browse the repository at this point in the history
  • Loading branch information
CharlesCheung96 committed Nov 6, 2024
1 parent c62c882 commit d30575e
Show file tree
Hide file tree
Showing 5 changed files with 306 additions and 37 deletions.
22 changes: 22 additions & 0 deletions .github/workflows/check_and_build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,25 @@ jobs:

- name: Build
run: make cdc

basic_e2e_test:
runs-on: ubuntu-latest
name: E2E Test
steps:
- name: Check out code
uses: actions/checkout@v2

- name: Setup Go environment
uses: actions/setup-go@v3
with:
go-version: '1.21'

- name: Integration Build
run: |
go build -o ./tools/bin/failpoint-ctl github.com/pingcap/failpoint/failpoint-ctl
make integration_test_build
- name: Test charset_gbk
run: |
tests/scripts/download-integration-test-binaries.sh master true
make integration_test CASE=charset_gbk
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ cdc:
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd

integration_test_build: check_failpoint_ctl
# $(FAILPOINT_ENABLE)
$(FAILPOINT_ENABLE)
$(GOTEST) -ldflags '$(LDFLAGS)' -c -cover -covermode=atomic \
-coverpkg=github.com/pingcap/ticdc/... \
-o bin/cdc.test github.com/pingcap/ticdc/cmd \
|| { $(FAILPOINT_DISABLE); echo "Failed to build cdc.test"; exit 1; }
$(GOBUILD) -ldflags '$(LDFLAGS)' -o bin/cdc ./cmd/main.go \
|| { $(FAILPOINT_DISABLE); exit 1; }
# $(FAILPOINT_DISABLE)
$(FAILPOINT_DISABLE)

failpoint-enable: check_failpoint_ctl
$(FAILPOINT_ENABLE)
Expand Down
70 changes: 35 additions & 35 deletions pkg/sink/mysql/mysql_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ func (w *MysqlWriter) FlushDDLEvent(event *commonEvent.DDLEvent) error {

if !(event.TiDBOnly && !w.cfg.IsTiDB) {
err := w.execDDLWithMaxRetries(event)

if err != nil {
log.Error("exec ddl failed", zap.Error(err))
return err
Expand Down Expand Up @@ -270,9 +269,6 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error {
case commonEvent.InfluenceTypeNormal:
tableIds = append(tableIds, relatedTables.TableIDs...)
case commonEvent.InfluenceTypeDB:
if w.tableSchemaStore == nil {
log.Panic("table schema store is nil")
}
ids := w.tableSchemaStore.GetTableIdsByDB(relatedTables.SchemaID)
tableIds = append(tableIds, ids...)
case commonEvent.InfluenceTypeAll:
Expand All @@ -299,41 +295,45 @@ func (w *MysqlWriter) SendDDLTs(event *commonEvent.DDLEvent) error {
tableIds = append(tableIds, table.TableID)
}

// generate query
//INSERT INTO `tidb_cdc`.`ddl_ts` (ticdc_cluster_id, changefeed, ddl_ts, table_id) values(...) ON DUPLICATE KEY UPDATE ddl_ts=VALUES(ddl_ts), created_at=CURRENT_TIMESTAMP;
var builder strings.Builder
builder.WriteString("INSERT INTO ")
builder.WriteString(filter.TiCDCSystemSchema)
builder.WriteString(".")
builder.WriteString(filter.DDLTsTable)
builder.WriteString("(ticdc_cluster_id, changefeed, ddl_ts, table_id) VALUES ")
if len(tableIds) > 0 {
// generate query
//INSERT INTO `tidb_cdc`.`ddl_ts` (ticdc_cluster_id, changefeed, ddl_ts, table_id) values(...) ON DUPLICATE KEY UPDATE ddl_ts=VALUES(ddl_ts), created_at=CURRENT_TIMESTAMP;
var builder strings.Builder
builder.WriteString("INSERT INTO ")
builder.WriteString(filter.TiCDCSystemSchema)
builder.WriteString(".")
builder.WriteString(filter.DDLTsTable)
builder.WriteString("(ticdc_cluster_id, changefeed, ddl_ts, table_id) VALUES ")

for idx, tableId := range tableIds {
builder.WriteString("('")
builder.WriteString(ticdcClusterID)
builder.WriteString("', '")
builder.WriteString(changefeedID)
builder.WriteString("', '")
builder.WriteString(ddlTs)
builder.WriteString("', ")
builder.WriteString(strconv.FormatInt(tableId, 10))
builder.WriteString(")")
if idx < len(tableIds)-1 {
builder.WriteString(", ")
for idx, tableId := range tableIds {
builder.WriteString("('")
builder.WriteString(ticdcClusterID)
builder.WriteString("', '")
builder.WriteString(changefeedID)
builder.WriteString("', '")
builder.WriteString(ddlTs)
builder.WriteString("', ")
builder.WriteString(strconv.FormatInt(tableId, 10))
builder.WriteString(")")
if idx < len(tableIds)-1 {
builder.WriteString(", ")
}
}
}
builder.WriteString(" ON DUPLICATE KEY UPDATE ddl_ts=VALUES(ddl_ts), created_at=CURRENT_TIMESTAMP;")
builder.WriteString(" ON DUPLICATE KEY UPDATE ddl_ts=VALUES(ddl_ts), created_at=CURRENT_TIMESTAMP;")

query := builder.String()
log.Info("query is", zap.Any("query", query))
_, err = tx.Exec(query)
if err != nil {
log.Error("failed to write ddl ts table", zap.Error(err))
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to write ddl ts table", zap.Error(err2))
query := builder.String()
log.Info("query is", zap.Any("query", query))
_, err = tx.Exec(query)
if err != nil {
log.Error("failed to write ddl ts table", zap.Error(err))
err2 := tx.Rollback()
if err2 != nil {
log.Error("failed to write ddl ts table", zap.Error(err2))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write ddl ts table;"))
}
return cerror.WrapError(cerror.ErrMySQLTxnError, errors.WithMessage(err, "failed to write ddl ts table;"))
} else {
log.Error("table ids is empty when write ddl ts table, FIX IT", zap.Any("event", event))
}

if len(dropTableIds) > 0 {
Expand Down
18 changes: 18 additions & 0 deletions pkg/sink/util/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/pingcap/ticdc/pkg/sink/codec/common"
cerror "github.com/pingcap/tiflow/pkg/errors"
"github.com/pingcap/tiflow/pkg/util"
"go.uber.org/zap"
)

// GetEncoderConfig returns the encoder config and validates the config.
Expand Down Expand Up @@ -78,17 +79,34 @@ func (s *TableSchemaStore) AddEvent(event *commonEvent.DDLEvent) {
s.tableIDStore.AddEvent(event)
}

func (s *TableSchemaStore) initialized() bool {
if s == nil || (s.tableIDStore == nil && s.tableNameStore == nil) {
log.Warn("TableSchemaStore is not initialized", zap.Any("tableSchemaStore", s))
return false
}
return true
}

func (s *TableSchemaStore) GetTableIdsByDB(schemaID int64) []int64 {
if !s.initialized() {
return nil
}
return s.tableIDStore.GetTableIdsByDB(schemaID)
}

func (s *TableSchemaStore) GetAllTableIds() []int64 {
if !s.initialized() {
return nil
}
return s.tableIDStore.GetAllTableIds()
}

// GetAllTableNames only will be called when maintainer send message to ask dispatcher to write checkpointTs to downstream.
// So the ts must be <= the latest received event ts of table trigger event dispatcher.
func (s *TableSchemaStore) GetAllTableNames(ts uint64) []*commonEvent.SchemaTableName {
if !s.initialized() {
return nil
}
return s.tableNameStore.GetAllTableNames(ts)
}

Expand Down
Loading

0 comments on commit d30575e

Please sign in to comment.