Skip to content

Commit

Permalink
binlog-filter: add new event type
Browse files Browse the repository at this point in the history
  • Loading branch information
asddongmen committed Jul 4, 2022
1 parent cf6b7a8 commit 682cf94
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 5 deletions.
84 changes: 79 additions & 5 deletions pkg/binlog-filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,41 @@ const (
CreateView EventType = "create view"
DropView EventType = "drop view"
AlertTable EventType = "alter table"

CreateSchema EventType = "create schema" // alias of CreateDatabase
DropSchema EventType = "drop schema" // alias of DropDatabase
AddColumn EventType = "add column"
DropColumn EventType = "drop column"
AddIndex EventType = "add index" // alias of CreateIndex
AddForeignKey EventType = "add foreign key"
DropForeignKey EventType = "drop foreign key"
ModifyColumn EventType = "modify column"
RebaseAutoID EventType = "rebase auto_increment"
SetDefaultValue EventType = "set default value"
ShardRowID EventType = "shard row id"
ModifyTableComment EventType = "modify table comment"
RenameIndex EventType = "rename index"
AddTablePartition EventType = "add table partition"
DropTablePartition EventType = "drop table partition"
TruncateTablePartition EventType = "truncate table partition"
ModifyTableCharsetAndCollate EventType = "modify table charset and collate"
ModifySchemaCharsetAndCollate EventType = "modify schema charset and collate"
RecoverTable EventType = "recover table"
LockTable EventType = "lock table"
UnlockTable EventType = "unlock table"
RepairTable EventType = "repair table"
SetTiFlashReplica EventType = "set tiflash replica"
UpdateTiFlashReplicaStatus EventType = "update tiflash replica status"
AddPrimaryKey EventType = "add primary key"
DropPrimaryKey EventType = "drop primary key"
CreateSequence EventType = "create sequence"
AlterSequence EventType = "alter sequence"
DropSequence EventType = "drop sequence"
// if need, add more AlertTableOption = "alert table option"

NullEvent EventType = ""
// InvalidEvent is used to indicate invalid event type
InvalidEvent EventType = "InvalidEvent"
)

// ClassifyEvent classify event into dml/ddl
Expand All @@ -75,12 +107,18 @@ func ClassifyEvent(event EventType) (EventType, error) {
case InsertEvent, UpdateEvent, DeleteEvent:
return dml, nil
case CreateDatabase, DropDatabase, CreateTable, DropTable, TruncateTable, RenameTable,
CreateIndex, DropIndex, CreateView, DropView, AlertTable:
CreateIndex, DropIndex, CreateView, DropView, AlertTable, AddColumn, DropColumn,
AddForeignKey, DropForeignKey, ModifyColumn, RebaseAutoID, SetDefaultValue,
ShardRowID, ModifyTableComment, RenameIndex, AddTablePartition, DropTablePartition,
TruncateTablePartition, ModifyTableCharsetAndCollate, ModifySchemaCharsetAndCollate,
RecoverTable, LockTable, UnlockTable, RepairTable, SetTiFlashReplica,
UpdateTiFlashReplica, AddPrimaryKey, DropPrimaryKey, CreateSequence,
AlterSequence, DropSequence, CreateSchema, DropSchema, AddIndex:
return ddl, nil
case NullEvent:
return NullEvent, nil
default:
return NoneEvent, errors.NotValidf("event type %s", event)
return InvalidEvent, errors.NotValidf("event type %s", event)
}
}

Expand Down Expand Up @@ -115,14 +153,50 @@ func (b *BinlogEventRule) Valid() error {
return errors.Errorf("action of binlog event rule %+v should not be empty", b)
}

// TODO: check validity of dml/ddl event.
for i := range b.Events {
b.Events[i] = EventType(strings.ToLower(string(b.Events[i])))
et, err := b.toEvent(string(b.Events[i]))
if err != nil {
return errors.Annotatef(ErrInvalidEventType, "event type %s", b.Events[i])
}
b.Events[i] = et
}

return nil
}

// ErrInvalidEventType is returned when event type is not valid.
var ErrInvalidEventType = errors.New("event type not found")

// toEvent converts event type string to EventType and check if it is valid.
func (b *BinlogEventRule) toEvent(es string) (EventType, error) {
event := EventType(strings.ToLower(es))
switch event {
case dml, ddl, AllEvent, AllDDL, AllDML,
NoneEvent, NoneDDL, NoneDML,
InsertEvent, UpdateEvent, DeleteEvent,
CreateDatabase, DropDatabase, CreateTable,
DropTable, TruncateTable, RenameTable,
CreateIndex, DropIndex, CreateView, DropView,
AlertTable, AddColumn, DropColumn,
AddForeignKey, DropForeignKey,
ModifyColumn, RebaseAutoID, SetDefaultValue,
ShardRowID, ModifyTableComment, RenameIndex,
AddTablePartition, DropTablePartition, TruncateTablePartition,
ModifyTableCharsetAndCollate, ModifySchemaCharsetAndCollate,
RecoverTable, LockTable, UnlockTable, RepairTable,
SetTiFlashReplica, UpdateTiFlashReplica, AddPrimaryKey, DropPrimaryKey,
CreateSequence, AlterSequence, DropSequence:
return event, nil
case CreateSchema: // alias of CreateDatabase
return CreateDatabase, nil
case DropSchema: // alias of DropDatabase
return DropDatabase, nil
case AddIndex: // alias of CreateIndex
return CreateIndex, nil
default:
return InvalidEvent, ErrInvalidEventType
}
}

// BinlogEvent filters binlog events by given rules
type BinlogEvent struct {
selector.Selector
Expand Down
26 changes: 26 additions & 0 deletions pkg/binlog-filter/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,3 +245,29 @@ func (t *testFilterSuite) TestGlobalFilter(c *C) {
c.Assert(action, Equals, cs.action)
}
}

func (t *testFilterSuit) TestToEvent(c *C) {
cases := []struct {
eventStr string
event EventType
err error
}{
{"", NullEvent, nil},
{"insert", InsertEvent, nil},
{"update", UpdateEvent, nil},
{"delete", DeleteEvent, nil},
{"create", CreateEvent, nil},
{"create schema", CreateDatabase, nil},
{"drop schema", DropDatabase, nil},
{"add index", CreateIndex, nil},
{"xxx", InvalidEvent, ErrInvalidEvent},
{"I don't know", InvalidEvent, ErrInvalidEvent},
{"add column", AddColumn, nil},
{"drop column", DropColumn, nil},
}
for _, cs := range cases {
event, err := ToEvent(cs.eventStr)
c.Assert(event, Equals, cs.event)
c.Assert(err, Equals, cs.event)
}
}

0 comments on commit 682cf94

Please sign in to comment.