diff --git a/go/vt/proto/binlogdata/binlogdata.pb.go b/go/vt/proto/binlogdata/binlogdata.pb.go index 804e5f414e6..583f9d2fcb1 100644 --- a/go/vt/proto/binlogdata/binlogdata.pb.go +++ b/go/vt/proto/binlogdata/binlogdata.pb.go @@ -56,29 +56,36 @@ func (OnDDLAction) EnumDescriptor() ([]byte, []int) { return fileDescriptor_5fd02bcb2e350dad, []int{0} } -// VEventType enumerates the event types. -// This list is comprehensive. Many of these types +// VEventType enumerates the event types. Many of these types // will not be encountered in RBR mode. type VEventType int32 const ( - VEventType_UNKNOWN VEventType = 0 - VEventType_GTID VEventType = 1 - VEventType_BEGIN VEventType = 2 - VEventType_COMMIT VEventType = 3 - VEventType_ROLLBACK VEventType = 4 - VEventType_DDL VEventType = 5 - VEventType_INSERT VEventType = 6 - VEventType_REPLACE VEventType = 7 - VEventType_UPDATE VEventType = 8 - VEventType_DELETE VEventType = 9 - VEventType_SET VEventType = 10 - VEventType_OTHER VEventType = 11 - VEventType_ROW VEventType = 12 - VEventType_FIELD VEventType = 13 + VEventType_UNKNOWN VEventType = 0 + VEventType_GTID VEventType = 1 + VEventType_BEGIN VEventType = 2 + VEventType_COMMIT VEventType = 3 + VEventType_ROLLBACK VEventType = 4 + VEventType_DDL VEventType = 5 + // INSERT, REPLACE, UPDATE, DELETE and SET will not be seen in RBR mode. + VEventType_INSERT VEventType = 6 + VEventType_REPLACE VEventType = 7 + VEventType_UPDATE VEventType = 8 + VEventType_DELETE VEventType = 9 + VEventType_SET VEventType = 10 + // OTHER is a dummy event. If encountered, the current GTID must be + // recorded by the client to be able to resume. + VEventType_OTHER VEventType = 11 + VEventType_ROW VEventType = 12 + VEventType_FIELD VEventType = 13 + // HEARTBEAT is sent if there is inactivity. If a client does not + // receive events beyond the hearbeat interval, it can assume that it's + // lost connection to the vstreamer. VEventType_HEARTBEAT VEventType = 14 - VEventType_VGTID VEventType = 15 - VEventType_JOURNAL VEventType = 16 + // VGTID is generated by VTGate's VStream that combines multiple + // GTIDs. + VEventType_VGTID VEventType = 15 + VEventType_JOURNAL VEventType = 16 ) var VEventType_name = map[int32]string{ @@ -596,14 +603,28 @@ func (m *StreamTablesResponse) GetBinlogTransaction() *BinlogTransaction { return nil } -// Rule represents one rule. +// Rule represents one rule in a Filter. type Rule struct { - // match can be a table name or a regular expression - // delineated by '/' and '/'. + // Match can be a table name or a regular expression. + // If it starts with a '/', it's a regular expression. + // For example, "t" matches a table named "t", whereas + // "/t.*" matches all tables that begin with 't'. Match string `protobuf:"bytes,1,opt,name=match,proto3" json:"match,omitempty"` - // filter can be an empty string or keyrange if the match - // is a regular expression. Otherwise, it must be a select - // query. + // Filter: If empty, all columns and rows of the matching tables + // are sent. If it's a keyrange like "-80", only rows that + // match the keyrange are sent. + // If Match is a table name instead of a regular expression, + // the Filter can also be a select expression like this: + // "select * from t", same as an empty Filter, or + // "select * from t where in_keyrange('-80')", same as "-80", or + // "select col1, col2 from t where in_keyrange(col1, 'hash', '-80'), or + // What is allowed in a select expression depends on whether + // it's a vstreamer or vreplication request. For more details, + // please refer to the specific package documentation. + // On the vreplication side, Filter can also accept a special + // "exclude" value, which will cause the matched tables + // to be excluded. + // TODO(sougou): support this on vstreamer side also. Filter string `protobuf:"bytes,2,opt,name=filter,proto3" json:"filter,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` @@ -649,10 +670,18 @@ func (m *Rule) GetFilter() string { return "" } -// Filter represents a list of ordered rules. First match -// wins. +// Filter represents a list of ordered rules. The first +// match wins. type Filter struct { - Rules []*Rule `protobuf:"bytes,1,rep,name=rules,proto3" json:"rules,omitempty"` + Rules []*Rule `protobuf:"bytes,1,rep,name=rules,proto3" json:"rules,omitempty"` + // FieldEventMode specifies the behavior if there is a mismatch + // between the current schema and the fields in the binlog. This + // can happen if the binlog position is before a DDL that would + // cause the fields to change. If vstreamer detects such + // an inconsistency, the behavior depends on the FieldEventMode. + // If the value is ERR_ON_MISMATCH (default), then it errors out. + // If it's BEST_EFFORT, it sends a field event with fake column + // names as "@1", "@2", etc. FieldEventMode Filter_FieldEventMode `protobuf:"varint,2,opt,name=fieldEventMode,proto3,enum=binlogdata.Filter_FieldEventMode" json:"fieldEventMode,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` @@ -699,8 +728,8 @@ func (m *Filter) GetFieldEventMode() Filter_FieldEventMode { } // BinlogSource specifies the source and filter parameters for -// Filtered Replication. It currently supports a keyrange -// or a list of tables. +// Filtered Replication. KeyRange and Tables are legacy. Filter +// is the new way to specify the filtering rules. type BinlogSource struct { // the source keyspace Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"` @@ -708,19 +737,19 @@ type BinlogSource struct { Shard string `protobuf:"bytes,2,opt,name=shard,proto3" json:"shard,omitempty"` // the source tablet type TabletType topodata.TabletType `protobuf:"varint,3,opt,name=tablet_type,json=tabletType,proto3,enum=topodata.TabletType" json:"tablet_type,omitempty"` - // key_range is set if the request is for a keyrange + // KeyRange is set if the request is for a keyrange KeyRange *topodata.KeyRange `protobuf:"bytes,4,opt,name=key_range,json=keyRange,proto3" json:"key_range,omitempty"` - // tables is set if the request is for a list of tables + // Tables is set if the request is for a list of tables Tables []string `protobuf:"bytes,5,rep,name=tables,proto3" json:"tables,omitempty"` - // filter is set if we're using the generalized representation + // Filter is set if we're using the generalized representation // for the filter. Filter *Filter `protobuf:"bytes,6,opt,name=filter,proto3" json:"filter,omitempty"` - // on_ddl specifies the action to be taken when a DDL is encountered. + // OnDdl specifies the action to be taken when a DDL is encountered. OnDdl OnDDLAction `protobuf:"varint,7,opt,name=on_ddl,json=onDdl,proto3,enum=binlogdata.OnDDLAction" json:"on_ddl,omitempty"` // Source is an external mysql. This attribute should be set to the username // to use in the connection ExternalMysql string `protobuf:"bytes,8,opt,name=external_mysql,json=externalMysql,proto3" json:"external_mysql,omitempty"` - // stop_after_copy specifies if vreplication should be stopped + // StopAfterCopy specifies if vreplication should be stopped // after copying is done. StopAfterCopy bool `protobuf:"varint,9,opt,name=stop_after_copy,json=stopAfterCopy,proto3" json:"stop_after_copy,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -816,7 +845,10 @@ func (m *BinlogSource) GetStopAfterCopy() bool { return false } -// RowChange represents one row change +// RowChange represents one row change. +// If Before is set and not After, it's a delete. +// If After is set and not Before, it's an insert. +// If both are set, it's an update. type RowChange struct { Before *query.Row `protobuf:"bytes,1,opt,name=before,proto3" json:"before,omitempty"` After *query.Row `protobuf:"bytes,2,opt,name=after,proto3" json:"after,omitempty"` @@ -864,7 +896,7 @@ func (m *RowChange) GetAfter() *query.Row { return nil } -// RowEvent represent row events for one table +// RowEvent represent row events for one table. type RowEvent struct { TableName string `protobuf:"bytes,1,opt,name=table_name,json=tableName,proto3" json:"table_name,omitempty"` RowChanges []*RowChange `protobuf:"bytes,2,rep,name=row_changes,json=rowChanges,proto3" json:"row_changes,omitempty"` @@ -912,6 +944,7 @@ func (m *RowEvent) GetRowChanges() []*RowChange { return nil } +// FieldEvent represents the field info for a table. type FieldEvent struct { TableName string `protobuf:"bytes,1,opt,name=table_name,json=tableName,proto3" json:"table_name,omitempty"` Fields []*query.Field `protobuf:"bytes,2,rep,name=fields,proto3" json:"fields,omitempty"` @@ -959,6 +992,11 @@ func (m *FieldEvent) GetFields() []*query.Field { return nil } +// ShardGtid contains the GTID position for one shard. +// It's used in a request for requesting a starting position. +// It's used in a response to transmit the current position +// of a shard. It's also used in a Journal to indicate the +// list of targets and shard positions to migrate to. type ShardGtid struct { Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"` Shard string `protobuf:"bytes,2,opt,name=shard,proto3" json:"shard,omitempty"` @@ -1014,6 +1052,7 @@ func (m *ShardGtid) GetGtid() string { return "" } +// A VGtid is a list of ShardGtids. type VGtid struct { ShardGtids []*ShardGtid `protobuf:"bytes,1,rep,name=shard_gtids,json=shardGtids,proto3" json:"shard_gtids,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` @@ -1053,6 +1092,7 @@ func (m *VGtid) GetShardGtids() []*ShardGtid { return nil } +// KeyspaceShard represents a keyspace and shard. type KeyspaceShard struct { Keyspace string `protobuf:"bytes,1,opt,name=keyspace,proto3" json:"keyspace,omitempty"` Shard string `protobuf:"bytes,2,opt,name=shard,proto3" json:"shard,omitempty"` @@ -1100,17 +1140,35 @@ func (m *KeyspaceShard) GetShard() string { return "" } +// Journal contains the metadata for a journal event. +// The commit of a journal event indicates the point of no return +// for a migration. type Journal struct { - Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` - MigrationType MigrationType `protobuf:"varint,2,opt,name=migration_type,json=migrationType,proto3,enum=binlogdata.MigrationType" json:"migration_type,omitempty"` - Tables []string `protobuf:"bytes,3,rep,name=tables,proto3" json:"tables,omitempty"` - LocalPosition string `protobuf:"bytes,4,opt,name=local_position,json=localPosition,proto3" json:"local_position,omitempty"` - ShardGtids []*ShardGtid `protobuf:"bytes,5,rep,name=shard_gtids,json=shardGtids,proto3" json:"shard_gtids,omitempty"` - Participants []*KeyspaceShard `protobuf:"bytes,6,rep,name=participants,proto3" json:"participants,omitempty"` - SourceWorkflows []string `protobuf:"bytes,7,rep,name=source_workflows,json=sourceWorkflows,proto3" json:"source_workflows,omitempty"` - XXX_NoUnkeyedLiteral struct{} `json:"-"` - XXX_unrecognized []byte `json:"-"` - XXX_sizecache int32 `json:"-"` + // Id represents a unique journal id. + Id int64 `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"` + MigrationType MigrationType `protobuf:"varint,2,opt,name=migration_type,json=migrationType,proto3,enum=binlogdata.MigrationType" json:"migration_type,omitempty"` + // Tables is set if the journal represents a TABLES migration. + Tables []string `protobuf:"bytes,3,rep,name=tables,proto3" json:"tables,omitempty"` + // LocalPosition is the source position at which the migration happened. + LocalPosition string `protobuf:"bytes,4,opt,name=local_position,json=localPosition,proto3" json:"local_position,omitempty"` + // ShardGtids is the list of targets to which the migration took place. + ShardGtids []*ShardGtid `protobuf:"bytes,5,rep,name=shard_gtids,json=shardGtids,proto3" json:"shard_gtids,omitempty"` + // Participants is the list of source participants for a migration. + // Every participant is expected to have an identical journal entry. + // While streaming, the client must wait for the journal entry to + // be received from all pariticipants, and then replace them with new + // streams specified by ShardGtid. + // If a stream does not have all participants, a consistent migration + // is not possible. + Participants []*KeyspaceShard `protobuf:"bytes,6,rep,name=participants,proto3" json:"participants,omitempty"` + // SourceWorkflows is the list of workflows in the source shard that + // were migrated to the target. If a migration fails after a Journal + // is committed, this information is used to start the target streams + // that were created prior to the creation of the journal. + SourceWorkflows []string `protobuf:"bytes,7,rep,name=source_workflows,json=sourceWorkflows,proto3" json:"source_workflows,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` } func (m *Journal) Reset() { *m = Journal{} } @@ -1187,18 +1245,36 @@ func (m *Journal) GetSourceWorkflows() []string { return nil } -// VEvent represents a vstream event +// VEvent represents a vstream event. +// A FieldEvent is sent once for every table, just before +// the first event for that table. The client is expected +// to cache this information and match it against the RowEvent +// which contains the table name. +// A GTID event always precedes a commitable event, which can be +// COMMIT, DDL or OTHER. +// OTHER events are non-material events that have no additional metadata. type VEvent struct { - Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"` - Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` - Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"` - Ddl string `protobuf:"bytes,4,opt,name=ddl,proto3" json:"ddl,omitempty"` - RowEvent *RowEvent `protobuf:"bytes,5,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"` + Type VEventType `protobuf:"varint,1,opt,name=type,proto3,enum=binlogdata.VEventType" json:"type,omitempty"` + // Timestamp is the binlog timestamp in seconds. + // It's set for all events. + Timestamp int64 `protobuf:"varint,2,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + // Gtid is set if the event type is GTID. + Gtid string `protobuf:"bytes,3,opt,name=gtid,proto3" json:"gtid,omitempty"` + // Ddl is set if the event type is DDL. + Ddl string `protobuf:"bytes,4,opt,name=ddl,proto3" json:"ddl,omitempty"` + // RowEvent is set if the event type is ROW. + RowEvent *RowEvent `protobuf:"bytes,5,opt,name=row_event,json=rowEvent,proto3" json:"row_event,omitempty"` + // FieldEvent is set if the event type is FIELD. FieldEvent *FieldEvent `protobuf:"bytes,6,opt,name=field_event,json=fieldEvent,proto3" json:"field_event,omitempty"` - Vgtid *VGtid `protobuf:"bytes,7,opt,name=vgtid,proto3" json:"vgtid,omitempty"` - Journal *Journal `protobuf:"bytes,8,opt,name=journal,proto3" json:"journal,omitempty"` - Dml string `protobuf:"bytes,9,opt,name=dml,proto3" json:"dml,omitempty"` - // current_time specifies the current time to handle clock skew. + // Vgtid is set if the event type is VGTID. + // This event is only generated by VTGate's VStream function. + Vgtid *VGtid `protobuf:"bytes,7,opt,name=vgtid,proto3" json:"vgtid,omitempty"` + // Journal is set if the event type is JOURNAL. + Journal *Journal `protobuf:"bytes,8,opt,name=journal,proto3" json:"journal,omitempty"` + // Dml is set if the event type is INSERT, REPLACE, UPDATE or DELETE. + Dml string `protobuf:"bytes,9,opt,name=dml,proto3" json:"dml,omitempty"` + // CurrentTime specifies the current time when the message was sent. + // This can be used to compenssate for clock skew. CurrentTime int64 `protobuf:"varint,20,opt,name=current_time,json=currentTime,proto3" json:"current_time,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` @@ -1300,7 +1376,7 @@ func (m *VEvent) GetCurrentTime() int64 { return 0 } -// VStreamRequest is the payload for VStream +// VStreamRequest is the payload for VStreamer type VStreamRequest struct { EffectiveCallerId *vtrpc.CallerID `protobuf:"bytes,1,opt,name=effective_caller_id,json=effectiveCallerId,proto3" json:"effective_caller_id,omitempty"` ImmediateCallerId *query.VTGateCallerID `protobuf:"bytes,2,opt,name=immediate_caller_id,json=immediateCallerId,proto3" json:"immediate_caller_id,omitempty"` @@ -1372,7 +1448,7 @@ func (m *VStreamRequest) GetFilter() *Filter { return nil } -// VStreamResponse is the response from VStream +// VStreamResponse is the response from VStreamer type VStreamResponse struct { Events []*VEvent `protobuf:"bytes,1,rep,name=events,proto3" json:"events,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` diff --git a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go index 95104a0a944..808d4d87437 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go +++ b/go/vt/vttablet/tabletmanager/vreplication/table_plan_builder.go @@ -29,6 +29,7 @@ import ( ) // ExcludeStr is the filter value for excluding tables that match a rule. +// TODO(sougou): support this on vstreamer side also. const ExcludeStr = "exclude" type tablePlanBuilder struct { diff --git a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go index cc482dc5e21..3315f9a941d 100644 --- a/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go +++ b/go/vt/vttablet/tabletserver/vstreamer/vstreamer.go @@ -64,6 +64,7 @@ func NewVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, // TODO(sougou): find a better way for this. var vschemaUpdateCount sync2.AtomicInt64 +// vstreamer is for serving a single vreplication stream on the source side. type vstreamer struct { ctx context.Context cancel func() @@ -85,12 +86,31 @@ type vstreamer struct { } // streamerPlan extends the original plan to also include -// the TableMap which is used to extract values from the binlog events. +// the TableMap, which comes from the binlog. It's used +// to extract values from the ROW events. type streamerPlan struct { *Plan TableMap *mysql.TableMap } +// newVStreamer creates a new vstreamer. +// cp: the mysql conn params. +// se: the schema engine. The vstreamer uses it to convert the TableMap into field info. +// startPos: a flavor compliant position to stream from. This can also contain the special +// value "current", which means start from the current position. +// filter: the list of filtering rules. If a rule has a select expressinon for its filter, +// the select list can only reference direct columns. No other experssions are allowed. +// The select expression is allowed to contain the special 'keyspace_id()' function which +// will return the keyspace id of the row. Examples: +// "select * from t", same as an empty Filter, +// "select * from t where in_keyrange('-80')", same as "-80", +// "select * from t where in_keyrange(col1, 'hash', '-80')", +// "select col1, col2 from t where...", +// "select col1, keyspace_id() from t where...". +// Only "in_keyrange" expressions are supported in the where clause. +// Other constructs like joins, group by, etc. are not supported. +// vschema: the current vschema. This value can later be changed through the SetVSchema method. +// send: callback function to send events. func newVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, startPos string, filter *binlogdatapb.Filter, vschema *localVSchema, send func([]*binlogdatapb.VEvent) error) *vstreamer { ctx, cancel := context.WithCancel(ctx) return &vstreamer{ @@ -107,7 +127,7 @@ func newVStreamer(ctx context.Context, cp *mysql.ConnParams, se *schema.Engine, } } -// SetVSchema updates all existing streams against the new vschema. +// SetVSchema updates the vstreamer against the new vschema. func (vs *vstreamer) SetVSchema(vschema *localVSchema) { // Since vs.Stream is a single-threaded loop. We just send an event to // that thread, which helps us avoid mutexes to update the plans. @@ -117,14 +137,16 @@ func (vs *vstreamer) SetVSchema(vschema *localVSchema) { } } +// Cancel stops the streaming. func (vs *vstreamer) Cancel() { vs.cancel() } -// Stream runs a single-threaded loop. +// Stream streams binlog events. func (vs *vstreamer) Stream() error { defer vs.cancel() + // Validate the request against the current position. curPos, err := vs.currentPosition() if err != nil { return vterrors.Wrap(err, "could not obtain current position") @@ -171,13 +193,22 @@ func (vs *vstreamer) currentPosition() (mysql.Position, error) { return conn.MasterPosition() } +// parseEvents parses and sends events. func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.BinlogEvent) error { // bufferAndTransmit uses bufferedEvents and curSize to buffer events. var ( bufferedEvents []*binlogdatapb.VEvent curSize int ) - // Buffering only takes row lengths into consideration. + // Only the following patterns are possible: + // BEGIN->ROWs or Statements->GTID->COMMIT. In the case of large transactions, this can be broken into chunks. + // BEGIN->JOURNAL->GTID->COMMIT + // GTID->DDL + // GTID->OTHER + // HEARTBEAT is issued if there's inactivity, which is likely + // to heppend between one group of events and another. + // + // Buffering only takes row or statement lengths into consideration. // Length of other events is considered negligible. // If a new row event causes the packet size to be exceeded, // all existing rows are sent without the new row. @@ -186,9 +217,14 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog switch vevent.Type { case binlogdatapb.VEventType_GTID, binlogdatapb.VEventType_BEGIN, binlogdatapb.VEventType_FIELD, binlogdatapb.VEventType_JOURNAL: // We never have to send GTID, BEGIN, FIELD events on their own. + // A JOURNAL event is always preceded by a BEGIN and followed by a COMMIT. + // So, we don't have to send it right away. bufferedEvents = append(bufferedEvents, vevent) case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_DDL, binlogdatapb.VEventType_OTHER, binlogdatapb.VEventType_HEARTBEAT: // COMMIT, DDL, OTHER and HEARTBEAT must be immediately sent. + // Although unlikely, it's possible to get a HEARTBEAT in the middle + // of a transaction. If so, we still send the partial transaction along + // with the heartbeat. bufferedEvents = append(bufferedEvents, vevent) vevents := bufferedEvents bufferedEvents = nil @@ -287,8 +323,8 @@ func (vs *vstreamer) parseEvents(ctx context.Context, events <-chan mysql.Binlog } } +// parseEvent parses an event from the binlog and converts it to a list of VEvents. func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, error) { - // Validate the buffer before reading fields from it. if !ev.IsValid() { return nil, fmt.Errorf("can't parse binlog event: invalid data: %#v", ev) } @@ -428,6 +464,11 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e } case ev.IsTableMap(): // This is very frequent. It precedes every row event. + // If it's the first time for a table, we generate a FIELD + // event, and also cache the plan. Subsequent TableMap events + // for that table id don't generate VEvents. + // A schema change will result in a change in table id, which + // will generate a new plan and FIELD event. id := ev.TableID(vs.format) if _, ok := vs.plans[id]; ok { return nil, nil @@ -437,6 +478,7 @@ func (vs *vstreamer) parseEvent(ev mysql.BinlogEvent) ([]*binlogdatapb.VEvent, e return nil, err } if tm.Database == "_vt" && tm.Name == "resharding_journal" { + // A journal is a special case that generates a JOURNAL event. return nil, vs.buildJournalPlan(id, tm) } if tm.Database != "" && tm.Database != vs.cp.DbName { @@ -493,6 +535,9 @@ func (vs *vstreamer) buildJournalPlan(id uint64, tm *mysql.TableMap) error { Name: "_vt.resharding_journal", Columns: st.Columns[:len(tm.Types)], } + // Build a normal table plan, which means, return all rows + // and columns as is. Special handling is done when we actually + // receive the row event. We'll build a JOURNAL event instead. plan, err := buildREPlan(table, nil, "") if err != nil { return err @@ -584,24 +629,27 @@ nextrow: return nil, err } if !afterOK { + // This can happen if someone manually deleted rows. continue } + // Exclude events that don't match the db_name. for i, fld := range plan.fields() { - switch fld.Name { - case "db_name": - if afterValues[i].ToString() != vs.cp.DbName { - continue nextrow - } - case "val": - journal := &binlogdatapb.Journal{} - if err := proto.UnmarshalText(afterValues[i].ToString(), journal); err != nil { - return nil, err - } - vevents = append(vevents, &binlogdatapb.VEvent{ - Type: binlogdatapb.VEventType_JOURNAL, - Journal: journal, - }) + if fld.Name == "db_name" && afterValues[i].ToString() != vs.cp.DbName { + continue nextrow + } + } + for i, fld := range plan.fields() { + if fld.Name != "val" { + continue + } + journal := &binlogdatapb.Journal{} + if err := proto.UnmarshalText(afterValues[i].ToString(), journal); err != nil { + return nil, err } + vevents = append(vevents, &binlogdatapb.VEvent{ + Type: binlogdatapb.VEventType_JOURNAL, + Journal: journal, + }) } } return vevents, nil diff --git a/proto/binlogdata.proto b/proto/binlogdata.proto index 110ba8902c7..22dbaec214a 100644 --- a/proto/binlogdata.proto +++ b/proto/binlogdata.proto @@ -115,25 +115,47 @@ message StreamTablesResponse { BinlogTransaction binlog_transaction = 1; } -// Rule represents one rule. +// Rule represents one rule in a Filter. message Rule { - // match can be a table name or a regular expression - // delineated by '/' and '/'. + // Match can be a table name or a regular expression. + // If it starts with a '/', it's a regular expression. + // For example, "t" matches a table named "t", whereas + // "/t.*" matches all tables that begin with 't'. string match = 1; - // filter can be an empty string or keyrange if the match - // is a regular expression. Otherwise, it must be a select - // query. + // Filter: If empty, all columns and rows of the matching tables + // are sent. If it's a keyrange like "-80", only rows that + // match the keyrange are sent. + // If Match is a table name instead of a regular expression, + // the Filter can also be a select expression like this: + // "select * from t", same as an empty Filter, or + // "select * from t where in_keyrange('-80')", same as "-80", or + // "select col1, col2 from t where in_keyrange(col1, 'hash', '-80'), or + // What is allowed in a select expression depends on whether + // it's a vstreamer or vreplication request. For more details, + // please refer to the specific package documentation. + // On the vreplication side, Filter can also accept a special + // "exclude" value, which will cause the matched tables + // to be excluded. + // TODO(sougou): support this on vstreamer side also. string filter = 2; } -// Filter represents a list of ordered rules. First match -// wins. +// Filter represents a list of ordered rules. The first +// match wins. message Filter { repeated Rule rules = 1; enum FieldEventMode { ERR_ON_MISMATCH = 0; BEST_EFFORT = 1; } + // FieldEventMode specifies the behavior if there is a mismatch + // between the current schema and the fields in the binlog. This + // can happen if the binlog position is before a DDL that would + // cause the fields to change. If vstreamer detects such + // an inconsistency, the behavior depends on the FieldEventMode. + // If the value is ERR_ON_MISMATCH (default), then it errors out. + // If it's BEST_EFFORT, it sends a field event with fake column + // names as "@1", "@2", etc. FieldEventMode fieldEventMode = 2; } @@ -146,8 +168,8 @@ enum OnDDLAction { } // BinlogSource specifies the source and filter parameters for -// Filtered Replication. It currently supports a keyrange -// or a list of tables. +// Filtered Replication. KeyRange and Tables are legacy. Filter +// is the new way to specify the filtering rules. message BinlogSource { // the source keyspace string keyspace = 1; @@ -158,30 +180,29 @@ message BinlogSource { // the source tablet type topodata.TabletType tablet_type = 3; - // key_range is set if the request is for a keyrange + // KeyRange is set if the request is for a keyrange topodata.KeyRange key_range = 4; - // tables is set if the request is for a list of tables + // Tables is set if the request is for a list of tables repeated string tables = 5; - // filter is set if we're using the generalized representation + // Filter is set if we're using the generalized representation // for the filter. Filter filter = 6; - // on_ddl specifies the action to be taken when a DDL is encountered. + // OnDdl specifies the action to be taken when a DDL is encountered. OnDDLAction on_ddl = 7; // Source is an external mysql. This attribute should be set to the username // to use in the connection string external_mysql = 8; - // stop_after_copy specifies if vreplication should be stopped + // StopAfterCopy specifies if vreplication should be stopped // after copying is done. bool stop_after_copy = 9; } -// VEventType enumerates the event types. -// This list is comprehensive. Many of these types +// VEventType enumerates the event types. Many of these types // will not be encountered in RBR mode. enum VEventType { UNKNOWN = 0; @@ -190,46 +211,65 @@ enum VEventType { COMMIT = 3; ROLLBACK = 4; DDL = 5; + // INSERT, REPLACE, UPDATE, DELETE and SET will not be seen in RBR mode. INSERT = 6; REPLACE = 7; UPDATE = 8; DELETE = 9; SET = 10; + // OTHER is a dummy event. If encountered, the current GTID must be + // recorded by the client to be able to resume. OTHER = 11; ROW = 12; FIELD = 13; + // HEARTBEAT is sent if there is inactivity. If a client does not + // receive events beyond the hearbeat interval, it can assume that it's + // lost connection to the vstreamer. HEARTBEAT = 14; + // VGTID is generated by VTGate's VStream that combines multiple + // GTIDs. VGTID = 15; JOURNAL = 16; } -// RowChange represents one row change +// RowChange represents one row change. +// If Before is set and not After, it's a delete. +// If After is set and not Before, it's an insert. +// If both are set, it's an update. message RowChange { query.Row before = 1; query.Row after = 2; } -// RowEvent represent row events for one table +// RowEvent represent row events for one table. message RowEvent { string table_name = 1; repeated RowChange row_changes = 2; } +// FieldEvent represents the field info for a table. message FieldEvent { string table_name = 1; repeated query.Field fields = 2; } +// ShardGtid contains the GTID position for one shard. +// It's used in a request for requesting a starting position. +// It's used in a response to transmit the current position +// of a shard. It's also used in a Journal to indicate the +// list of targets and shard positions to migrate to. message ShardGtid { string keyspace = 1; string shard = 2; string gtid = 3; } +// A VGtid is a list of ShardGtids. message VGtid { repeated ShardGtid shard_gtids = 1; } +// KeyspaceShard represents a keyspace and shard. message KeyspaceShard { string keyspace = 1; string shard = 2; @@ -241,32 +281,68 @@ enum MigrationType { SHARDS = 1; } +// Journal contains the metadata for a journal event. +// The commit of a journal event indicates the point of no return +// for a migration. message Journal { + // Id represents a unique journal id. int64 id = 1; MigrationType migration_type = 2; + // Tables is set if the journal represents a TABLES migration. repeated string tables = 3; + // LocalPosition is the source position at which the migration happened. string local_position = 4; + // ShardGtids is the list of targets to which the migration took place. repeated ShardGtid shard_gtids = 5; + // Participants is the list of source participants for a migration. + // Every participant is expected to have an identical journal entry. + // While streaming, the client must wait for the journal entry to + // be received from all pariticipants, and then replace them with new + // streams specified by ShardGtid. + // If a stream does not have all participants, a consistent migration + // is not possible. repeated KeyspaceShard participants = 6; + // SourceWorkflows is the list of workflows in the source shard that + // were migrated to the target. If a migration fails after a Journal + // is committed, this information is used to start the target streams + // that were created prior to the creation of the journal. repeated string source_workflows = 7; } -// VEvent represents a vstream event +// VEvent represents a vstream event. +// A FieldEvent is sent once for every table, just before +// the first event for that table. The client is expected +// to cache this information and match it against the RowEvent +// which contains the table name. +// A GTID event always precedes a commitable event, which can be +// COMMIT, DDL or OTHER. +// OTHER events are non-material events that have no additional metadata. message VEvent { VEventType type = 1; + // Timestamp is the binlog timestamp in seconds. + // It's set for all events. int64 timestamp = 2; + // Gtid is set if the event type is GTID. string gtid = 3; + // Ddl is set if the event type is DDL. string ddl = 4; + // RowEvent is set if the event type is ROW. RowEvent row_event = 5; + // FieldEvent is set if the event type is FIELD. FieldEvent field_event = 6; + // Vgtid is set if the event type is VGTID. + // This event is only generated by VTGate's VStream function. VGtid vgtid = 7; + // Journal is set if the event type is JOURNAL. Journal journal = 8; + // Dml is set if the event type is INSERT, REPLACE, UPDATE or DELETE. string dml = 9; - // current_time specifies the current time to handle clock skew. + // CurrentTime specifies the current time when the message was sent. + // This can be used to compenssate for clock skew. int64 current_time = 20; } -// VStreamRequest is the payload for VStream +// VStreamRequest is the payload for VStreamer message VStreamRequest { vtrpc.CallerID effective_caller_id = 1; query.VTGateCallerID immediate_caller_id = 2; @@ -276,7 +352,7 @@ message VStreamRequest { Filter filter = 5; } -// VStreamResponse is the response from VStream +// VStreamResponse is the response from VStreamer message VStreamResponse { repeated VEvent events = 1; }