-
Notifications
You must be signed in to change notification settings - Fork 374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix clickhouse client race during batch commit #4071
Conversation
/test-all |
Codecov Report
@@ Coverage Diff @@
## main #4071 +/- ##
==========================================
+ Coverage 64.21% 67.38% +3.16%
==========================================
Files 294 293 -1
Lines 44555 44288 -267
==========================================
+ Hits 28612 29844 +1232
+ Misses 13612 12143 -1469
+ Partials 2331 2301 -30
|
602b8cc
to
474ebbe
Compare
/test-e2e |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks for taking care of this
@@ -431,11 +433,18 @@ func (ch *ClickHouseExportProcess) batchCommitAll() (int, error) { | |||
} | |||
|
|||
// populate items from deque | |||
recordsToExport := make([]*ClickHouseFlowRow, 0, currSize) | |||
ch.mutex.Lock() | |||
for i := 0; i < currSize; i++ { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's worth adding a comment explaining why currSize is guaranteed to still be "valid", i.e. at most equal to the actual queue size (it's possible for the queue to have grown since the earlier check, but not possible for the queue to have shrunk). If this is not true, then the code is not correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's correct because CacheSet
can never shrink the queue by itself. The following loop:
for ch.deque.Len() >= ch.queueSize {
ch.deque.PopFront()
}
will never remove more than 1 item, and we push a new item right after with ch.deque.PushBack(chRow)
, so overall the size of the queue either increases by 1 or stays the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can rather read again within mutex context to be 100% safe and save the detailed explanation (which depends on implementation and could be forgotten to be updated). adding a comment for why it could have changed instead of explaining why it's still correct.
@@ -487,24 +496,33 @@ func (ch *ClickHouseExportProcess) batchCommitAll() (int, error) { | |||
|
|||
if err != nil { | |||
klog.ErrorS(err, "Error when adding record") | |||
ch.pushFrontRecords(recordsToExport) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the new implementation makes sense, given that the likelihood of an error is small
474ebbe
to
a89111f
Compare
This change fixes an unsafe deque access in flow aggregator clickhouse client. Reads of the deque including Len() and At() are not protected with mutex during batchCommitAll(), which could run concurrently with another routine writing items to deque with CacheSet(). To avoid holding the mutex for extended period of time, the following behavior is adopted: 1. (With mutex) Pop front #Len() items off deque and store in buffer 2. (Without mutex) Transform records in buffer and batch export them with SQL driver. This step is where I/O happens. 3. (With mutex) If SQL commit rolled back, PushFront records in buffer back to deque. Oldest records will be discarded if deque full. Also bumped github.com/gammazero/deque to v0.1.2. Signed-off-by: Shawn Wang <[email protected]>
a89111f
to
5354f99
Compare
/test-e2e |
/test-integration |
This change fixes an unsafe deque access in flow aggregator clickhouse client. Reads of the deque including Len() and At() are not protected with mutex during batchCommitAll(), which could run concurrently with another routine writing items to deque with CacheSet().
To avoid holding the mutex for extended period of time, the following behavior is adopted:
Also bumped github.com/gammazero/deque to v0.1.2.