-
Notifications
You must be signed in to change notification settings - Fork 288
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
relay: add async/batch relay writer #7580
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
/run-all-tests |
/run-all-tests |
/run-all-tests |
/run-all-tests |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need to accept all comments if it's negligible in performance
dm/relay/binlog_writer.go
Outdated
if w.file == nil { | ||
return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")) | ||
} | ||
w.input <- rawData | ||
w.offset.Add(int64(len(rawData))) | ||
return w.Error() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since every Write need to check Error, Error is a hot function and we'd better using fast implementation. I prefer atomic variable like using atomic.Error
and CAS to replace nil to an error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest lgtm
if w.file == nil { | ||
return terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened")) | ||
} | ||
w.input <- rawData | ||
w.offset.Add(int64(len(rawData))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BinlogReader
will use this offset to check whether there's need to re-parse binlog, maybe move it after we actual write data.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we use offset to check whether hole exist, so cannot update it later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiflow/dm/relay/relay_writer.go
Line 304 in d98f885
fileOffset := w.out.Offset() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use another offset?
dm/relay/binlog_writer.go
Outdated
} | ||
n, err := w.file.Write(buf.Bytes()) | ||
if err != nil { | ||
w.err.CompareAndSwap(nil, terror.ErrBinlogWriterWriteDataLen.Delegate(err, n)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sorry seems the nil
for CAS is not what I expected
func TestCAS(t *testing.T) {
var nilErr error
e := atomic.NewError(nil)
e.Store(nilErr)
require.NoError(t, e.Load())
ok := e.CompareAndSwap(nilErr, errors.New("test"))
require.True(t, ok)
}
Can we use the nilErr
variable or simply Store
? but in the latter we will store the last error not the first one.
dm/relay/binlog_writer.go
Outdated
} | ||
|
||
if w.file == nil { | ||
w.err.CompareAndSwap(nilErr, terror.ErrRelayWriterNotOpened.Delegate(errors.New("file not opened"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we simply use Generate()
rather than Delegate
?
dm/relay/relay_writer.go
Outdated
@@ -145,6 +152,10 @@ func (w *FileWriter) handleFormatDescriptionEvent(ev *replication.BinlogEvent) ( | |||
if err != nil { | |||
return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName) | |||
} | |||
err = w.Flush() | |||
if err != nil { | |||
return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return WResult{}, terror.Annotatef(err, "write binlog file header for %s", fullName) | |
return WResult{}, terror.Annotatef(err, "flush binlog file for %s", fullName) |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 71a7896
|
/run-dm-integration-test |
1 similar comment
/run-dm-integration-test |
/run-all-tests |
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 4d0a324
|
/merge |
What problem does this PR solve?
Issue Number: ref #4287
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note