-
Notifications
You must be signed in to change notification settings - Fork 188
relay: use Reader interface read binlog events #92
Conversation
/run-all-tests |
@GregoryIan @amyangfei PTAL |
pkg/binlog/reader/tcp.go
Outdated
// TCPReaderStatus represents the status of a TCPReader. | ||
type TCPReaderStatus struct { | ||
Stage string `json:"stage"` | ||
Connection uint32 `json:"connection"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's meaning of Connection
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
renamed to ConnID
, but keep the json flag as connection
.
pkg/binlog/reader/tcp.go
Outdated
defer r.stageMu.Unlock() | ||
|
||
if r.stage != stageNew { | ||
return errors.NotValidf("stage %d, expect %d", r.stage, stageNew) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is log stage %d, expect %d is not valid
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to stage %s, expect %s
only.
pkg/binlog/reader/tcp.go
Outdated
r.syncerCfg.User, r.syncerCfg.Password, r.syncerCfg.Host, r.syncerCfg.Port) | ||
db, err := sql.Open("mysql", dsn) | ||
if err != nil { | ||
return errors.Annotate(err, "open connection to the master") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can output which master
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
pkg/binlog/reader/tcp.go
Outdated
defer db.Close() | ||
err = utils.KillConn(db, connID) | ||
if err != nil { | ||
return errors.Annotatef(err, "kill connection %d", connID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
pkg/binlog/reader/tcp.go
Outdated
func (r *TCPReader) GetEvent(ctx context.Context, checkStage bool) (*replication.BinlogEvent, error) { | ||
if checkStage { | ||
r.stageMu.Lock() | ||
if r.stage != stagePrepared { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would atomic operation be better?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
changed to use atomic operation, and removed checkStage
.
pkg/binlog/reader/tcp.go
Outdated
|
||
// StartSyncByGTID implements Reader.StartSyncByGTID. | ||
func (r *TCPReader) StartSyncByGTID(gSet gtid.Set) error { | ||
r.stageMu.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in fact this lock prevent more than stage
field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, rename to mu
.
Rest LGTM. well done |
/run-all-tests |
pkg/binlog/reader/tcp.go
Outdated
func (r *TCPReader) Status() interface{} { | ||
r.mu.Lock() | ||
stage := r.stage | ||
r.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why only lock for stage getter? besides I find some redundant lock protection, such as
r.mu.Lock()
defer r.mu.Unlock()
r.stage.Get()
Is there any consideration to do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I change to use stage = r.stage.Get
without lock.
We use stage
both in comparing(L150) and recording(L154).
Saving stage
in L148, so we can get the same value both in L150 and L154.
LGTM |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
What problem does this PR solve?
This PR is a part work of #91
Introduced a binlog event
Reader
interface, so we can implement at least 3 kinds of Reader later:In this PR, I only implement a
TCPReader
and refactor a litter code for relay writer to make review easier.Later, the following things will be done:
What is changed and how it works?
TCPReader
TCPReader
Check List
Tests
Code changes