Skip to content

Commit

Permalink
*: add Writer for relay log (pingcap#117)
Browse files Browse the repository at this point in the history
  • Loading branch information
csuzhangxc authored May 22, 2019
1 parent 1100017 commit d771f5f
Show file tree
Hide file tree
Showing 27 changed files with 2,955 additions and 106 deletions.
20 changes: 11 additions & 9 deletions relay/reader/stage.go → pkg/binlog/common/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package reader
package common

type readerStage int32
// Stage represents a stage for some binlog related objects, like Writer.Reader.
type Stage int32

// valid stages for binlog related objects.
const (
stageNew readerStage = iota
stagePrepared
stageClosed
StageNew Stage = iota
StagePrepared
StageClosed
)

// String implements Stringer.String.
func (s readerStage) String() string {
func (s Stage) String() string {
switch s {
case stageNew:
case StageNew:
return "new"
case stagePrepared:
case StagePrepared:
return "prepared"
case stageClosed:
case StageClosed:
return "closed"
default:
return "unknown"
Expand Down
59 changes: 59 additions & 0 deletions pkg/binlog/common/stage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"testing"

. "github.com/pingcap/check"
)

func TestSuite(t *testing.T) {
TestingT(t)
}

var (
_ = Suite(&testStageSuite{})
)

type testStageSuite struct {
}

func (t *testStageSuite) TestStageString(c *C) {
cases := []struct {
stage Stage
str string
}{
{
stage: StageNew,
str: "new",
},
{
stage: StagePrepared,
str: "prepared",
},
{
stage: StageClosed,
str: "closed",
},
{
stage: Stage(100),
str: "unknown",
},
}

for _, cs := range cases {
c.Assert(cs.stage.String(), Equals, cs.str)
}
}
14 changes: 5 additions & 9 deletions pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*repl
ServerID: serverID,
Flags: defaultHeaderFlags,
}
latestPos = uint32(len(replication.BinLogFileHeader))
prevGTIDsEv *replication.BinlogEvent // for MySQL, this will be nil
prevGTIDsEvData []byte
latestPos = uint32(len(replication.BinLogFileHeader))
prevGTIDsEv *replication.BinlogEvent // for MySQL, this will be a GenericEvent
)

formatDescEv, err := GenFormatDescriptionEvent(header, latestPos)
Expand All @@ -62,12 +61,9 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*repl

switch flavor {
case gmysql.MySQLFlavor:
prevGTIDsEvData, err = GenPreviousGTIDsEvent(header, latestPos, gSet)
prevGTIDsEv, err = GenPreviousGTIDsEvent(header, latestPos, gSet)
case gmysql.MariaDBFlavor:
prevGTIDsEv, err = GenMariaDBGTIDListEvent(header, latestPos, gSet)
if err == nil {
prevGTIDsEvData = prevGTIDsEv.RawData
}
default:
return nil, nil, errors.NotSupportedf("flavor %s", flavor)
}
Expand All @@ -84,9 +80,9 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*repl
if err != nil {
return nil, nil, errors.Annotatef(err, "write FormatDescriptionEvent % X", formatDescEv.RawData)
}
_, err = buf.Write(prevGTIDsEvData)
_, err = buf.Write(prevGTIDsEv.RawData)
if err != nil {
return nil, nil, errors.Annotatef(err, "write PreviousGTIDsEvent/MariadbGTIDListEvent % X", prevGTIDsEvData)
return nil, nil, errors.Annotatef(err, "write PreviousGTIDsEvent/MariadbGTIDListEvent % X", prevGTIDsEv.RawData)
}

events := []*replication.BinlogEvent{formatDescEv, prevGTIDsEv}
Expand Down
2 changes: 1 addition & 1 deletion pkg/binlog/event/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (t *testCommonSuite) TestGenCommonFileHeader(c *C) {
c.Assert(err, IsNil)
c.Assert(len(events), Equals, 2)
c.Assert(events[0].Header.EventType, Equals, replication.FORMAT_DESCRIPTION_EVENT)
c.Assert(events[1], IsNil) // for MySQL, no real PreviousGTIDsEvent
c.Assert(events[1].Header.EventType, Equals, replication.PREVIOUS_GTIDS_EVENT)

// write to file then parse it
dir := c.MkDir()
Expand Down
76 changes: 69 additions & 7 deletions pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ const (
eventHeaderLen = uint8(replication.EventHeaderSize) // always 19
crc32Len uint32 = 4 // CRC32-length
tableMapFlags uint16 = 1 // flags in TableMapEvent's post-header, not used yet

// MinUserVarEventLen represents the minimum event length for a USER_VAR_EVENT with checksum
MinUserVarEventLen = uint32(eventHeaderLen+4+1+1) + crc32Len // 29 bytes
// MinQueryEventLen represents the minimum event length for a QueryEvent with checksum
MinQueryEventLen = uint32(eventHeaderLen+4+4+1+2+2+1+1) + crc32Len // 38 bytes
)

var (
Expand All @@ -57,6 +62,10 @@ var (
0x04, 0x1a, 0x08, 0x00, 0x00, 0x00, 0x08, 0x08, 0x08, 0x02, 0x00, 0x00, 0x00, 0x0a, 0x0a, 0x0a,
0x2a, 0x2a, 0x00, 0x12, 0x34, 0x00,
}
// user var name used in dummy USER_VAR_EVENT
dummyUserVarName = []byte("!dummyvar")
// dummy (commented) query in a QueryEvent
dummyQuery = []byte("# dummy query, often used to fill a hole in a binlog file")
)

// GenEventHeader generates a EventHeader's raw data according to a passed-in EventHeader struct.
Expand Down Expand Up @@ -187,12 +196,12 @@ func GenRotateEvent(header *replication.EventHeader, latestPos uint32, nextLogNa
}

// GenPreviousGTIDsEvent generates a PreviousGTIDsEvent.
// go-mysql has no PreviousGTIDsEvent struct defined, so return the event's raw data instead.
// go-mysql has no PreviousGTIDsEvent struct defined, so return a GenericEvent instead.
// MySQL has no internal doc for PREVIOUS_GTIDS_EVENT.
// we ref:
// a. https://github.com/vitessio/vitess/blob/28e7e5503a6c3d3b18d4925d95f23ebcb6f25c8e/go/mysql/binlog_event_mysql56.go#L56
// b. https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gSet gtid.Set) ([]byte, error) {
func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gSet gtid.Set) (*replication.BinlogEvent, error) {
if gSet == nil || len(gSet.String()) == 0 {
return nil, errors.NotValidf("empty GTID set")
}
Expand All @@ -206,11 +215,9 @@ func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gS
payload := origin.Encode()

buf := new(bytes.Buffer)
_, err := assembleEvent(buf, nil, false, *header, replication.PREVIOUS_GTIDS_EVENT, latestPos, nil, payload)
if err != nil {
return nil, errors.Trace(err)
}
return buf.Bytes(), nil
event := &replication.GenericEvent{} // no PreviousGTIDsEvent struct defined, so use a GenericEvent instead.
ev, err := assembleEvent(buf, event, false, *header, replication.PREVIOUS_GTIDS_EVENT, latestPos, nil, payload)
return ev, errors.Trace(err)
}

// GenGTIDEvent generates a GTIDEvent.
Expand Down Expand Up @@ -788,3 +795,58 @@ func GenMariaDBGTIDEvent(header *replication.EventHeader, latestPos uint32, sequ
ev, err := assembleEvent(buf, event, false, *header, replication.MARIADB_GTID_EVENT, latestPos, nil, payload.Bytes())
return ev, errors.Trace(err)
}

// GenDummyEvent generates a dummy QueryEvent or a dummy USER_VAR_EVENT.
// Dummy events often used to fill the holes in a relay log file which lacking some events from the master.
// The minimum size is 29 bytes (19 bytes header + 6 bytes body for a USER_VAR_EVENT + 4 bytes checksum).
// ref: https://dev.mysql.com/doc/internals/en/user-var-event.html
// ref: https://github.com/MariaDB/server/blob/a765b19e5ca31a3d866cdbc8bef3a6f4e5e44688/sql/log_event.cc#L4950
func GenDummyEvent(header *replication.EventHeader, latestPos uint32, eventSize uint32) (*replication.BinlogEvent, error) {
if eventSize < MinUserVarEventLen {
return nil, errors.Errorf("required dummy event size (%d) is too small, the minimum supported size is %d", eventSize, MinUserVarEventLen)
}

// modify flag in the header
headerClone := *header // do a copy
headerClone.Flags &= ^replication.LOG_EVENT_THREAD_SPECIFIC_F
headerClone.Flags |= replication.LOG_EVENT_SUPPRESS_USE_F
headerClone.Flags |= replication.LOG_EVENT_RELAY_LOG_F // now, the dummy event created by relay only

if eventSize < MinQueryEventLen {
// generate a USER_VAR_EVENT
var (
payload = new(bytes.Buffer)
buf = new(bytes.Buffer)
event = &replication.GenericEvent{}
eventType = replication.USER_VAR_EVENT
nameLen = eventSize - (MinUserVarEventLen - 1)
nameBytes = make([]byte, nameLen)
)
copy(nameBytes, dummyUserVarName)
// name_length, 4 bytes
err := binary.Write(payload, binary.LittleEndian, uint32(nameLen))
if err != nil {
return nil, errors.Annotatef(err, "write USER_VAR_EVENT name length %d", nameLen)
}
// name, name_length bytes (now, at least 1 byte)
err = binary.Write(payload, binary.LittleEndian, nameBytes)
if err != nil {
return nil, errors.Annotatef(err, "write USER_VAR_EVENT name % X", nameBytes)
}
// is_null, 1 byte
isNull := byte(1) // always is null (no `value` part)
err = binary.Write(payload, binary.LittleEndian, isNull)
if err != nil {
return nil, errors.Annotatef(err, "write USER_VAR_EVENT is-null % X", isNull)
}
ev, err := assembleEvent(buf, event, false, headerClone, eventType, latestPos, nil, payload.Bytes())
return ev, errors.Trace(err)
}

// generate a QueryEvent
queryLen := eventSize - (MinQueryEventLen - 1)
queryBytes := make([]byte, queryLen)
copy(queryBytes, dummyQuery)
ev, err := GenQueryEvent(&headerClone, latestPos, 0, 0, 0, nil, nil, queryBytes)
return ev, errors.Trace(err)
}
Loading

0 comments on commit d771f5f

Please sign in to comment.