Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Checkpointing doesn't trigger records flush to clickhouse #156

Closed
1 of 2 tasks
karpoftea opened this issue Oct 11, 2024 · 4 comments · Fixed by #158
Closed
1 of 2 tasks

[Bug]: Checkpointing doesn't trigger records flush to clickhouse #156

karpoftea opened this issue Oct 11, 2024 · 4 comments · Fixed by #158

Comments

@karpoftea
Copy link

What happened?

Originally I've tested connector agains type error failures (type incompatibility between kafka source table and clickhouse sink table): selected from kafka-table integer column (say, json number field as cnt INTEGER in kafka table) and inserted it to clickhouse table column (Int64). If cnt=1 everything works as expected - value is saved to clickhouse. If in clickhouse I change column type to UInt64 and cnt=-1 then exception occurs (which is OK), task restart and after several restart it changes state to RUNNING (so just leaving corrupted message behind). That is not an expected behaviour, because data was lost. Expected behaviour is to stuck and wait for manual resolve (either move offset or change clickhouse table schema).

After I digged into code and found that DynamicTableSink is implemented using OutputFormatProvider/OutputFormat. My guess that OutputFormat does not call flush() when checkpoint occurs and thus checkpointing is always OK. Then I changed connector sink.flush-interval to 10min and set flink checkpoint to 1min, and saw that ClickHouseBatchOutputFormat.flush() is not triggered by checkpoint. Seems like my guess is right.

Can you kindly tell If using OutputFormat as a SinkRuntimeProvider was a design choice? If yes what was the reason for not choosing SinkAPI (org.apache.flink.api.connector.sink2.Sink) for implementation?

Affects Versions

master/1.16.0

What are you seeing the problem on?

Flink-Table-Api (SQL)

How to reproduce

  1. create kafka source table and clickhouse sink table
  2. select from kafka source and insert selected value to clickhouse sink
  3. set flink checkpoint interval to 1m
  4. set sink.flush-interval to 10min
  5. start cluster and submit pipeline
  6. push 1 message to kafka.
  7. wait for a checkpoint
  8. after checkpoint occurs see that (1) offset has moved forward (2) message was not delivered to clickhouse

Relevant log output

No response

Anything else

the core problem is that checkpointing does not trigger flush, so event if sink has exception (flushException) it will be healthy for a flink runtime

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

  • I agree to follow this project's Code of Conduct
@itinycheng
Copy link
Owner

Hi @karpoftea

Thanks for the insight!
The initial version of the project was developed with reference to flink-connector-jdbc 1.12 which uses SinkRuntimeProvider. I didn't realize before that the checkpoint doesn't trigger the flush function, so thank you very much.
Maybe I want to use SinkFunctionProvider as a temporary solution, due to its simplicity and ease of implementation, org.apache.flink.api.connector.sink2.Sink is better but requires more effort.

@LJSJackson
Copy link

I seem to have encountered the same problem, have you solved it yet? Could you please help me and tell me how to solve it? I hope to receive your reply. Thank you @karpoftea

@karpoftea
Copy link
Author

@LJSJackson no I haven’t. I’m on a way to launch this connector as is on production workloads, an if no critical issue arises, then propose a patch for this.
I can’t see an easy (1-2 days of coding) way to fix it right now.

@itinycheng
Copy link
Owner

Hello all:

I made a temporary fix, sorry for the late submission.

Hi @karpoftea:
If you have any suggestions for improvements, please feel free to share your thoughts. Your feedback is greatly appreciated.
Thank you!

czy006 added a commit that referenced this issue Dec 15, 2024
[fix]: Ensure checkpointing triggers records flush to ClickHouse #156
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
3 participants