diff --git a/cdc/entry/mounter.go b/cdc/entry/mounter.go index 57316aae6b9..8304bbdebaf 100644 --- a/cdc/entry/mounter.go +++ b/cdc/entry/mounter.go @@ -16,6 +16,7 @@ package entry import ( "bytes" "context" + "encoding/hex" "encoding/json" "fmt" "math" @@ -172,10 +173,18 @@ func (m *mounter) unmarshalAndMountRowChanged(ctx context.Context, raw *model.Ra } tableInfo, exist := snap.PhysicalTableByID(physicalTableID) if !exist { + // for truncate table and truncate table partition DDL, the table ID is changed, but DML can be inserted to TiKV with old table ID. + // normally, cdc will close the old table pipeline and create a new one, and these invalid DMLs keys will not be pulled by CDC, + // but if redo is enabled or push based table pipeline is enabled, puller and mounter are not blocked by barrier ts. + // So some invalid DML keys will be decoded before processor removing the table pipeline if snap.IsTruncateTableID(physicalTableID) { log.Debug("skip the DML of truncated table", zap.Uint64("ts", raw.CRTs), zap.Int64("tableID", physicalTableID)) return nil, nil } + log.Error("can not found table schema", + zap.Uint64("ts", raw.CRTs), + zap.String("key", hex.EncodeToString(raw.Key)), + zap.Int64("tableID", physicalTableID)) return nil, cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(physicalTableID) } if bytes.HasPrefix(key, recordPrefix) { diff --git a/cdc/entry/schema/snapshot.go b/cdc/entry/schema/snapshot.go index 5367a038eb4..46416437dde 100644 --- a/cdc/entry/schema/snapshot.go +++ b/cdc/entry/schema/snapshot.go @@ -468,11 +468,16 @@ func (s *Snapshot) DoHandleDDL(job *timodel.Job) error { if err != nil { return errors.Trace(err) } - case timodel.ActionTruncateTablePartition, + case timodel.ActionTruncateTablePartition: + err := s.inner.updatePartition(getWrapTableInfo(job), true, job.BinlogInfo.FinishedTS) + if err != nil { + return errors.Trace(err) + } + case timodel.ActionAddTablePartition, timodel.ActionDropTablePartition, timodel.ActionReorganizePartition: - err := s.inner.updatePartition(getWrapTableInfo(job), job.BinlogInfo.FinishedTS) + err := s.inner.updatePartition(getWrapTableInfo(job), false, job.BinlogInfo.FinishedTS) if err != nil { return errors.Trace(err) } @@ -863,7 +868,7 @@ func (s *snapshot) doCreateTable(tbInfo *model.TableInfo, currentTs uint64) { } // updatePartition updates partition info for `tbInfo`. -func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) error { +func (s *snapshot) updatePartition(tbInfo *model.TableInfo, isTruncate bool, currentTs uint64) error { oldTbInfo, ok := s.physicalTableByID(tbInfo.ID) if !ok { return cerror.ErrSnapshotTableNotFound.GenWithStackByArgs(tbInfo.ID) @@ -888,6 +893,7 @@ func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) er for _, partition := range oldPi.Definitions { s.partitions.ReplaceOrInsert(newVersionedID(partition.ID, tag)) } + newPartitionIDMap := make(map[int64]struct{}, len(newPi.NewPartitionIDs)) for _, partition := range newPi.Definitions { vid := newVersionedID(partition.ID, tag) vid.target = tbInfo @@ -895,6 +901,14 @@ func (s *snapshot) updatePartition(tbInfo *model.TableInfo, currentTs uint64) er if ineligible { s.ineligibleTables.ReplaceOrInsert(newVersionedID(partition.ID, tag)) } + newPartitionIDMap[partition.ID] = struct{}{} + } + if isTruncate { + for _, partition := range oldPi.Definitions { + if _, ok := newPartitionIDMap[partition.ID]; !ok { + s.truncatedTables.ReplaceOrInsert(newVersionedID(partition.ID, tag)) + } + } } s.currentTs = currentTs @@ -984,7 +998,7 @@ func (s *snapshot) exchangePartition(targetTable *model.TableInfo, currentTS uin // ref: https://github.com/pingcap/tidb/issues/43819 targetTable.SchemaID = oldTable.SchemaID targetTable.TableName = oldTable.TableName - err = s.updatePartition(targetTable, currentTS) + err = s.updatePartition(targetTable, false, currentTS) if err != nil { return errors.Trace(err) } diff --git a/cdc/entry/schema/snapshot_test.go b/cdc/entry/schema/snapshot_test.go index d8248784f6c..68106599e9b 100644 --- a/cdc/entry/schema/snapshot_test.go +++ b/cdc/entry/schema/snapshot_test.go @@ -219,19 +219,19 @@ func TestUpdatePartition(t *testing.T) { oldTb = newTbInfo(1, "DB_1", 11) oldTb.Partition = nil require.Nil(t, snap.inner.createTable(oldTb, 110)) - require.Error(t, snap.inner.updatePartition(newTbInfo(1, "DB_1", 11), 120)) + require.Error(t, snap.inner.updatePartition(newTbInfo(1, "DB_1", 11), false, 120)) // updatePartition fails if the new table is not partitioned. require.Nil(t, snap.inner.dropTable(11, 130)) require.Nil(t, snap.inner.createTable(newTbInfo(1, "DB_1", 11), 140)) newTb = newTbInfo(1, "DB_1", 11) newTb.Partition = nil - require.Error(t, snap.inner.updatePartition(newTb, 150)) + require.Error(t, snap.inner.updatePartition(newTb, false, 150)) snap1 = snap.Copy() newTb = newTbInfo(1, "DB_1", 11) newTb.Partition.Definitions[0] = timodel.PartitionDefinition{ID: 11 + 65536*2} - require.Nil(t, snap.inner.updatePartition(newTb, 160)) + require.Nil(t, snap.inner.updatePartition(newTb, false, 160)) snap2 = snap.Copy() info, _ = snap1.PhysicalTableByID(11) @@ -253,6 +253,31 @@ func TestUpdatePartition(t *testing.T) { require.True(t, snap2.IsIneligibleTableID(11+65536*2)) } +func TestTruncateTablePartition(t *testing.T) { + var oldTb, newTb *model.TableInfo + + snap := NewEmptySnapshot(false) + require.Nil(t, snap.inner.createSchema(newDBInfo(1), 100)) + + // updatePartition fails if the old table is not partitioned. + oldTb = newTbInfo(1, "DB_1", 11) + oldTb.Partition = nil + require.Nil(t, snap.inner.createTable(oldTb, 110)) + require.Error(t, snap.inner.updatePartition(newTbInfo(1, "DB_1", 11), false, 120)) + + // updatePartition fails if the new table is not partitioned. + require.Nil(t, snap.inner.dropTable(11, 130)) + require.Nil(t, snap.inner.createTable(newTbInfo(1, "DB_1", 11), 140)) + newTb = newTbInfo(1, "DB_1", 11) + newTb.Partition = nil + require.Error(t, snap.inner.updatePartition(newTb, false, 150)) + + newTb = newTbInfo(1, "DB_1", 11) + newTb.Partition.Definitions[0] = timodel.PartitionDefinition{ID: 11 + 65536*2} + require.Nil(t, snap.inner.updatePartition(newTb, true, 160)) + require.True(t, snap.IsTruncateTableID(11+65536)) +} + func TestExchangePartition(t *testing.T) { var targetTb, sourceTb *model.TableInfo