-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Error handling in StreamingBatchWriter #1913
Conversation
@@ -225,7 +235,7 @@ func TestStreamingBatchSizeRows(t *testing.T) { | |||
t.Fatalf("expected 0 open tables, got %d", l) | |||
} | |||
|
|||
if l := testClient.MessageLen(messageTypeInsert); l != 3 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added two inserts above, but increased this from 3 to 4... This was incorrect to expect 3 because the flushing flow has been different for some time, and you don't need the third message to make it flush.
case err := <-outputCh: | ||
if err != nil { | ||
s.errCh <- fmt.Errorf("handler failed on %s: %w", tableName, err) | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This return
statement is my actual contribution apart from renaming variables and updating tests, oh and better handling of error type panics.
for { | ||
select { | ||
case msg := <-msgs: | ||
if msg == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't you want to msg, ok :=
instead here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that would be clearer, fixed in 512d6c0
w.lastMsgType = msgType | ||
if err := w.startWorker(ctx, errCh, msg); err != nil { | ||
|
||
case err := <-errCh: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The previous logic wasn't shutting down on error; is this correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previous logic would do a w.logger.Err(err).Msg("error from StreamingBatchWriter")
and continue working.
@erezrokah adding you as well; this is a little tricky with not that much experience with the writer |
Merge at will, I'm not merging right now to prevent any potential noise. (and the unreleased SDK already has some interesting changes) |
🤖 I have created a release *beep* *boop* --- ## [4.64.1](v4.64.0...v4.64.1) (2024-10-02) ### Bug Fixes * Error handling in StreamingBatchWriter ([#1913](#1913)) ([d852119](d852119)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please).
Looks like this broke something cloudquery/cloudquery#19312 |
Reverts #1913 This broke come stuff, so reverting it to unblock SDK changes cloudquery/cloudquery#19312 (comment)
Actual clean up by @murarustefaan but I've done some minor updates to it and updated tests.
Still need to test it E2E.Seems to work, handled thepanic: arrow/array: number of columns/fields mismatch [recovered]
error immediately (I tested with an old version of the S3 plugin, had to manually bump Arrow to v17)