Skip to content
This repository has been archived by the owner on Nov 24, 2023. It is now read-only.

*: add Writer for relay log #117

Merged
merged 63 commits into from
May 22, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
5ae5bec
write: add a binlog file writer
csuzhangxc Apr 16, 2019
98252cc
writer: basic structure
csuzhangxc Apr 18, 2019
1b54643
writer: add test case for starting underlying writer
csuzhangxc Apr 18, 2019
10767be
Update relay/writer/file_test.go
kennytm Apr 19, 2019
6246da8
*: address comments
csuzhangxc Apr 19, 2019
0516ef0
writer: check binlog file header and FormatDescriptionEvent helper fu…
csuzhangxc Apr 19, 2019
d24c04c
*: handle FormatDescriptionEvent in FileWriter
csuzhangxc Apr 19, 2019
b4633ef
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc Apr 19, 2019
79f7262
writer: fix CI
csuzhangxc Apr 19, 2019
f89ea7c
writer: handle RotateEvent
csuzhangxc Apr 22, 2019
f2940f6
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc Apr 22, 2019
4ef41f1
writer: add test for RotateEvent with FormatDescriptionEvent
csuzhangxc Apr 22, 2019
6a28944
event: change GenPreviousGTIDsEvent to return a GenericEvent
csuzhangxc Apr 22, 2019
2b62d66
writer: add test case with DDL and DML
csuzhangxc Apr 22, 2019
d6b2d1e
writer: update offset after events wrote
csuzhangxc Apr 22, 2019
50f4187
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc Apr 22, 2019
595f276
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc Apr 24, 2019
8d8f289
event: GenDummyEvent (USER_VAR_EVENT/QueryEvent)
csuzhangxc Apr 24, 2019
18f1367
writer: use dummy event to fill the potential hole
csuzhangxc Apr 25, 2019
ed3015c
writer: check event is duplicate
csuzhangxc Apr 25, 2019
298e87c
writer: handle duplicate events
csuzhangxc Apr 25, 2019
ea41ba8
writer: close FileReader in defer
csuzhangxc Apr 25, 2019
866df1e
*: refine code structure
csuzhangxc Apr 25, 2019
5c640fc
event: get GTID set from a PreviousGTIDsEvent
csuzhangxc Apr 25, 2019
b071768
event: get GTID set from a MariaDBGTIDListEvent
csuzhangxc Apr 25, 2019
7d3517b
writer: add Recover method; recover nothing
csuzhangxc Apr 26, 2019
aaf6b1f
writer: truncate uncompleted event part
csuzhangxc Apr 26, 2019
16ad527
writer: test truncate the uncompleted transaction
csuzhangxc Apr 27, 2019
ea9dfd7
writer: refine code
csuzhangxc Apr 27, 2019
e9141e8
writer: get latest GTID set for all completed transactions
csuzhangxc Apr 27, 2019
651dc32
writer: get latest pos/GTID set from MariaDB binlog file
csuzhangxc Apr 27, 2019
d6791f4
writer: test without GTID; test illegal GTID
csuzhangxc Apr 27, 2019
95b30f8
writer: add some comment
csuzhangxc Apr 27, 2019
384f5b0
writer: recover a file to get latest GTID set
csuzhangxc Apr 27, 2019
3fc0091
writer: add a comment
csuzhangxc Apr 27, 2019
9f0a93c
writer: address comment
csuzhangxc Apr 28, 2019
68dddc6
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 7, 2019
d4c9499
*: address comments
csuzhangxc May 7, 2019
7f04c2f
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 8, 2019
e824a91
event: address comment
csuzhangxc May 8, 2019
4371856
event: address comments
csuzhangxc May 8, 2019
4586ef4
*: address comments
csuzhangxc May 10, 2019
7827ce8
writer: address comment
csuzhangxc May 10, 2019
677fca6
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 10, 2019
30ff9f2
*: try fix CI
csuzhangxc May 10, 2019
e59c2ed
*: address comment to unify Stage for binlog related objects
csuzhangxc May 12, 2019
dd7487a
writer: address comments
csuzhangxc May 12, 2019
9e3172a
*: address comments
csuzhangxc May 12, 2019
7514e2f
*: address comments
csuzhangxc May 12, 2019
1739d15
*: address comments
csuzhangxc May 13, 2019
abeac9b
*: address comments
csuzhangxc May 13, 2019
60007e1
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 13, 2019
63d1b7b
*: address comments
csuzhangxc May 13, 2019
67393f7
writer: address comments
csuzhangxc May 13, 2019
a4a3b15
writer: address comments
csuzhangxc May 13, 2019
96e108e
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 14, 2019
0d3de7d
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 20, 2019
80fd184
writer: address comment, handleDuplicateEventsExist only when holeSiz…
csuzhangxc May 21, 2019
d536bd4
writer: refine Writer interface
csuzhangxc May 21, 2019
6d54140
writer: return `true` when FormatDescriptionEvent exist for `checkFor…
csuzhangxc May 21, 2019
16e0574
writer: check hole and duplicate for RotateEvent
csuzhangxc May 21, 2019
6802a07
Merge remote-tracking branch 'remotes/origin/master' into relay-writer-2
csuzhangxc May 22, 2019
2f332b0
writer: address comment
csuzhangxc May 22, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions relay/reader/stage.go → pkg/binlog/common/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,24 +11,26 @@
// See the License for the specific language governing permissions and
// limitations under the License.

package reader
package common

type readerStage int32
// Stage represents a stage for some binlog related objects, like Writer.Reader.
type Stage int32

// valid stages for binlog related objects.
const (
stageNew readerStage = iota
stagePrepared
stageClosed
StageNew Stage = iota
StagePrepared
StageClosed
)

// String implements Stringer.String.
func (s readerStage) String() string {
func (s Stage) String() string {
switch s {
case stageNew:
case StageNew:
return "new"
case stagePrepared:
case StagePrepared:
return "prepared"
case stageClosed:
case StageClosed:
return "closed"
default:
return "unknown"
Expand Down
59 changes: 59 additions & 0 deletions pkg/binlog/common/stage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package common

import (
"testing"

. "github.com/pingcap/check"
)

func TestSuite(t *testing.T) {
TestingT(t)
}

var (
_ = Suite(&testStageSuite{})
)

type testStageSuite struct {
}

func (t *testStageSuite) TestStageString(c *C) {
cases := []struct {
stage Stage
str string
}{
{
stage: StageNew,
str: "new",
},
{
stage: StagePrepared,
str: "prepared",
},
{
stage: StageClosed,
str: "closed",
},
{
stage: Stage(100),
str: "unknown",
},
}

for _, cs := range cases {
c.Assert(cs.stage.String(), Equals, cs.str)
}
}
14 changes: 5 additions & 9 deletions pkg/binlog/event/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,8 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*repl
ServerID: serverID,
Flags: defaultHeaderFlags,
}
latestPos = uint32(len(replication.BinLogFileHeader))
prevGTIDsEv *replication.BinlogEvent // for MySQL, this will be nil
prevGTIDsEvData []byte
latestPos = uint32(len(replication.BinLogFileHeader))
prevGTIDsEv *replication.BinlogEvent // for MySQL, this will be a GenericEvent
)

formatDescEv, err := GenFormatDescriptionEvent(header, latestPos)
Expand All @@ -62,12 +61,9 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*repl

switch flavor {
case gmysql.MySQLFlavor:
prevGTIDsEvData, err = GenPreviousGTIDsEvent(header, latestPos, gSet)
prevGTIDsEv, err = GenPreviousGTIDsEvent(header, latestPos, gSet)
case gmysql.MariaDBFlavor:
prevGTIDsEv, err = GenMariaDBGTIDListEvent(header, latestPos, gSet)
if err == nil {
prevGTIDsEvData = prevGTIDsEv.RawData
}
default:
return nil, nil, errors.NotSupportedf("flavor %s", flavor)
}
Expand All @@ -84,9 +80,9 @@ func GenCommonFileHeader(flavor string, serverID uint32, gSet gtid.Set) ([]*repl
if err != nil {
return nil, nil, errors.Annotatef(err, "write FormatDescriptionEvent % X", formatDescEv.RawData)
}
_, err = buf.Write(prevGTIDsEvData)
_, err = buf.Write(prevGTIDsEv.RawData)
if err != nil {
return nil, nil, errors.Annotatef(err, "write PreviousGTIDsEvent/MariadbGTIDListEvent % X", prevGTIDsEvData)
return nil, nil, errors.Annotatef(err, "write PreviousGTIDsEvent/MariadbGTIDListEvent % X", prevGTIDsEv.RawData)
}

events := []*replication.BinlogEvent{formatDescEv, prevGTIDsEv}
Expand Down
2 changes: 1 addition & 1 deletion pkg/binlog/event/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (t *testCommonSuite) TestGenCommonFileHeader(c *C) {
c.Assert(err, IsNil)
c.Assert(len(events), Equals, 2)
c.Assert(events[0].Header.EventType, Equals, replication.FORMAT_DESCRIPTION_EVENT)
c.Assert(events[1], IsNil) // for MySQL, no real PreviousGTIDsEvent
c.Assert(events[1].Header.EventType, Equals, replication.PREVIOUS_GTIDS_EVENT)

// write to file then parse it
dir := c.MkDir()
Expand Down
76 changes: 69 additions & 7 deletions pkg/binlog/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ const (
eventHeaderLen = uint8(replication.EventHeaderSize) // always 19
crc32Len uint32 = 4 // CRC32-length
tableMapFlags uint16 = 1 // flags in TableMapEvent's post-header, not used yet

// MinUserVarEventLen represents the minimum event length for a USER_VAR_EVENT with checksum
MinUserVarEventLen = uint32(eventHeaderLen+4+1+1) + crc32Len // 29 bytes
// MinQueryEventLen represents the minimum event length for a QueryEvent with checksum
MinQueryEventLen = uint32(eventHeaderLen+4+4+1+2+2+1+1) + crc32Len // 38 bytes
)

var (
Expand All @@ -57,6 +62,10 @@ var (
0x04, 0x1a, 0x08, 0x00, 0x00, 0x00, 0x08, 0x08, 0x08, 0x02, 0x00, 0x00, 0x00, 0x0a, 0x0a, 0x0a,
0x2a, 0x2a, 0x00, 0x12, 0x34, 0x00,
}
// user var name used in dummy USER_VAR_EVENT
dummyUserVarName = []byte("!dummyvar")
// dummy (commented) query in a QueryEvent
dummyQuery = []byte("# dummy query, often used to fill a hole in a binlog file")
)

// GenEventHeader generates a EventHeader's raw data according to a passed-in EventHeader struct.
Expand Down Expand Up @@ -187,12 +196,12 @@ func GenRotateEvent(header *replication.EventHeader, latestPos uint32, nextLogNa
}

// GenPreviousGTIDsEvent generates a PreviousGTIDsEvent.
// go-mysql has no PreviousGTIDsEvent struct defined, so return the event's raw data instead.
// go-mysql has no PreviousGTIDsEvent struct defined, so return a GenericEvent instead.
// MySQL has no internal doc for PREVIOUS_GTIDS_EVENT.
// we ref:
// a. https://github.com/vitessio/vitess/blob/28e7e5503a6c3d3b18d4925d95f23ebcb6f25c8e/go/mysql/binlog_event_mysql56.go#L56
// b. https://dev.mysql.com/doc/internals/en/com-binlog-dump-gtid.html
func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gSet gtid.Set) ([]byte, error) {
func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gSet gtid.Set) (*replication.BinlogEvent, error) {
if gSet == nil || len(gSet.String()) == 0 {
return nil, errors.NotValidf("empty GTID set")
}
Expand All @@ -206,11 +215,9 @@ func GenPreviousGTIDsEvent(header *replication.EventHeader, latestPos uint32, gS
payload := origin.Encode()

buf := new(bytes.Buffer)
_, err := assembleEvent(buf, nil, false, *header, replication.PREVIOUS_GTIDS_EVENT, latestPos, nil, payload)
if err != nil {
return nil, errors.Trace(err)
}
return buf.Bytes(), nil
event := &replication.GenericEvent{} // no PreviousGTIDsEvent struct defined, so use a GenericEvent instead.
ev, err := assembleEvent(buf, event, false, *header, replication.PREVIOUS_GTIDS_EVENT, latestPos, nil, payload)
return ev, errors.Trace(err)
}

// GenGTIDEvent generates a GTIDEvent.
Expand Down Expand Up @@ -788,3 +795,58 @@ func GenMariaDBGTIDEvent(header *replication.EventHeader, latestPos uint32, sequ
ev, err := assembleEvent(buf, event, false, *header, replication.MARIADB_GTID_EVENT, latestPos, nil, payload.Bytes())
return ev, errors.Trace(err)
}

// GenDummyEvent generates a dummy QueryEvent or a dummy USER_VAR_EVENT.
// Dummy events often used to fill the holes in a relay log file which lacking some events from the master.
// The minimum size is 29 bytes (19 bytes header + 6 bytes body for a USER_VAR_EVENT + 4 bytes checksum).
// ref: https://dev.mysql.com/doc/internals/en/user-var-event.html
// ref: https://github.com/MariaDB/server/blob/a765b19e5ca31a3d866cdbc8bef3a6f4e5e44688/sql/log_event.cc#L4950
func GenDummyEvent(header *replication.EventHeader, latestPos uint32, eventSize uint32) (*replication.BinlogEvent, error) {
if eventSize < MinUserVarEventLen {
return nil, errors.Errorf("required dummy event size (%d) is too small, the minimum supported size is %d", eventSize, MinUserVarEventLen)
}

// modify flag in the header
headerClone := *header // do a copy
headerClone.Flags &= ^replication.LOG_EVENT_THREAD_SPECIFIC_F
headerClone.Flags |= replication.LOG_EVENT_SUPPRESS_USE_F
headerClone.Flags |= replication.LOG_EVENT_RELAY_LOG_F // now, the dummy event created by relay only

if eventSize < MinQueryEventLen {
// generate a USER_VAR_EVENT
var (
payload = new(bytes.Buffer)
buf = new(bytes.Buffer)
event = &replication.GenericEvent{}
eventType = replication.USER_VAR_EVENT
nameLen = eventSize - (MinUserVarEventLen - 1)
nameBytes = make([]byte, nameLen)
)
copy(nameBytes, dummyUserVarName)
// name_length, 4 bytes
err := binary.Write(payload, binary.LittleEndian, uint32(nameLen))
if err != nil {
return nil, errors.Annotatef(err, "write USER_VAR_EVENT name length %d", nameLen)
}
// name, name_length bytes (now, at least 1 byte)
err = binary.Write(payload, binary.LittleEndian, nameBytes)
if err != nil {
return nil, errors.Annotatef(err, "write USER_VAR_EVENT name % X", nameBytes)
}
// is_null, 1 byte
isNull := byte(1) // always is null (no `value` part)
err = binary.Write(payload, binary.LittleEndian, isNull)
if err != nil {
return nil, errors.Annotatef(err, "write USER_VAR_EVENT is-null % X", isNull)
}
ev, err := assembleEvent(buf, event, false, headerClone, eventType, latestPos, nil, payload.Bytes())
return ev, errors.Trace(err)
}

// generate a QueryEvent
queryLen := eventSize - (MinQueryEventLen - 1)
queryBytes := make([]byte, queryLen)
copy(queryBytes, dummyQuery)
ev, err := GenQueryEvent(&headerClone, latestPos, 0, 0, 0, nil, nil, queryBytes)
return ev, errors.Trace(err)
}
Loading