Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink-CDC 2.3.0 consumes data based on SPECIFIC_OFFSETS. If the table structure is changed after the starting offset, it will not be able to consume the data correctly. #1962

Closed
2 tasks done
wallkop opened this issue Mar 6, 2023 · 6 comments
Labels
bug Something isn't working

Comments

@wallkop
Copy link
Contributor

wallkop commented Mar 6, 2023

Search before asking

  • I searched in the issues and found nothing similar.

Flink version

1.14.5

Flink CDC version

2.3.0

Database and its version

5.7.37

Minimal reproduce step

1.`SHOW MASTER STATUS`
get GTIDs: cacec830-00de-11ed-bddc-525400b586f9:1-358, Subsequently, we will try to consume the binlog of the tbl_test table by setting specificOffset with this GTID.

2.`INSERT INTO `test_split_1`.`tbl_test`(`id`) VALUES (2)`
Insert a record into the tbl_test table

3.`ALTER TABLE `test_split_1`.`tbl_test` ADD COLUMN `t` varchar(255) NOT NULL AFTER `v`;`
modify its structure by adding a new column

4.`INSERT INTO `test_split_1`.`tbl_test`(`id`) VALUES (3)`
insert another record into the tbl_test table

5.`StartupOptions.specificOffset("cacec830-00de-11ed-bddc-525400b586f9:1-358")`
Set specificOffset and try to consume the binlog from before the table structure change

6.CDC throws an error and cannot recover.

What did you expect to see?

The CDC program consumes the binlog normally from the specified GTIDs

What did you see instead?

The CDC program throws an error:

2023-03-06 11:30:04,454 2023-03-06 11:30:04,454 ERROR [io.debezium.pipeline.ErrorHandler] - Producer failure
com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to SPECIFIC_OFFSETS
	at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:84)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:422)
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:88)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1017)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
	at java.lang.Thread.run(Thread.java:750)
Caused by: io.debezium.DebeziumException: Error processing binlog event
	... 8 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1678072845, file=mysql-bin.000008, pos=80481, gtids=cacec830-00de-11ed-bddc-525400b586f9:1-357, row=1, server_id=1, event=2}
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:795)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:878)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:787)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:981)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:404)
	... 7 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
	at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
	... 12 more
2023-03-06 11:30:04,456 2023-03-06 11:30:04,456 INFO [io.debezium.connector.mysql.MySqlStreamingChangeEventSource] - Error processing binlog event, and propagating to Kafka Connect so it stops this connector. Future binlog events read before connector is shutdown will be ignored.
2023-03-06 11:30:04,531 2023-03-06 11:30:04,531 INFO [io.debezium.connector.mysql.MySqlStreamingChangeEventSource] - Keepalive thread is running
2023-03-06 11:30:04,772 2023-03-06 11:30:04,772 ERROR [org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager] - Received uncaught exception.
java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
	at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:79)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:422)
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:88)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1017)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
	... 1 more
Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to SPECIFIC_OFFSETS
	at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:84)
	... 8 more
Caused by: io.debezium.DebeziumException: Error processing binlog event
	... 8 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1678072845, file=mysql-bin.000008, pos=80481, gtids=cacec830-00de-11ed-bddc-525400b586f9:1-357, row=1, server_id=1, event=2}
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:795)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:878)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:787)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:981)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:404)
	... 7 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
	at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
	... 12 more
2023-03-06 11:30:04,773 2023-03-06 11:30:04,773 INFO [com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader] - Close current debezium reader com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader
2023-03-06 11:30:04,784 2023-03-06 11:30:04,784 INFO [io.debezium.jdbc.JdbcConnection] - Connection gracefully closed
2023-03-06 11:30:04,785 2023-03-06 11:30:04,785 INFO [io.debezium.connector.mysql.MySqlStreamingChangeEventSource] - Stopped reading binlog after 0 events, no new offset was recorded
2023-03-06 11:30:04,786 2023-03-06 11:30:04,786 INFO [org.apache.flink.connector.base.source.reader.SourceReaderBase] - Closing Source Reader.
2023-03-06 11:30:04,786 2023-03-06 11:30:04,786 INFO [org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher] - Shutting down split fetcher 0
2023-03-06 11:30:04,837 2023-03-06 11:30:04,837 WARN [com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader] - Failed to close the binlog split reader in 30 seconds.
2023-03-06 11:30:04,837 2023-03-06 11:30:04,837 INFO [org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher] - Split fetcher 0 exited.
2023-03-06 11:30:04,844 2023-03-06 11:30:04,844 WARN [org.apache.flink.runtime.taskmanager.Task] - Source: mysql-cdc-source (1/1)#0 (e98fb9c665c2e2b107663f21b343ae50) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
	at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:79)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:422)
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:88)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1017)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
	... 1 more
Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to SPECIFIC_OFFSETS
	at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:84)
	... 8 more
Caused by: io.debezium.DebeziumException: Error processing binlog event
	... 8 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1678072845, file=mysql-bin.000008, pos=80481, gtids=cacec830-00de-11ed-bddc-525400b586f9:1-357, row=1, server_id=1, event=2}
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:795)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:878)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:787)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:981)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:404)
	... 7 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
	at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
	... 12 more

2023-03-06 11:30:04,844 2023-03-06 11:30:04,844 INFO [org.apache.flink.runtime.taskmanager.Task] - Freeing task resources for Source: mysql-cdc-source (1/1)#0 (e98fb9c665c2e2b107663f21b343ae50).
2023-03-06 11:30:04,858 2023-03-06 11:30:04,858 INFO [org.apache.flink.runtime.taskexecutor.TaskExecutor] - Un-registering task and sending final execution state FAILED to JobManager for task Source: mysql-cdc-source (1/1)#0 e98fb9c665c2e2b107663f21b343ae50.
2023-03-06 11:30:04,866 2023-03-06 11:30:04,866 INFO [org.apache.flink.runtime.executiongraph.ExecutionGraph] - Source: mysql-cdc-source (1/1) (e98fb9c665c2e2b107663f21b343ae50) switched from RUNNING to FAILED on cdc74e39-140c-47bb-868b-7f58874b0c14 @ localhost (dataPort=-1).
java.lang.RuntimeException: One or more fetchers have encountered exception
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354)
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
	at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
	at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
	at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:79)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:422)
	at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:88)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1017)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
	at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
	at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
	... 1 more
Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to SPECIFIC_OFFSETS
	at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:84)
	... 8 more
Caused by: io.debezium.DebeziumException: Error processing binlog event
	... 8 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1678072845, file=mysql-bin.000008, pos=80481, gtids=cacec830-00de-11ed-bddc-525400b586f9:1-357, row=1, server_id=1, event=2}
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleInsert$4(MySqlStreamingChangeEventSource.java:795)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:878)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleInsert(MySqlStreamingChangeEventSource.java:787)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$16(MySqlStreamingChangeEventSource.java:981)
	at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:404)
	... 7 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
	at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
	at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
	at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitCreateRecord(RelationalChangeRecordEmitter.java:69)
	at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:45)
	at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
	... 12 more

Anything else?

It is very common in actual business scenarios to encounter situations where the table structure has been changed after the starting binlog offset when recovering data from SPECIFIC_OFFSETS. If this issue is not resolved, the functionality of data recovery from SPECIFIC_OFFSETS will become less useful.

After reading the source code of CDC and Debezium, I found that when recovering data based on SPECIFIC_OFFSETS, the table structure is still obtained from the latest schema. To solve this problem, I believe that it is necessary to obtain a schema snapshot at the starting binlog timestamp. This can be achieved using the following approach:

1.First, obtain the latest schema.
2.Consume the binlog from the starting position, and if the row format of the binlog is inconsistent with the obtained schema, enter a historical schema recovery logic (the source code throws an exception).
3.Replay the binlog and only retrieve schema changed events to restore the schema to the historical snapshot corresponding to the starting binlog timestamp.
4.Based on the schema historical snapshot, start consuming from the starting binlog position again and capture schema changed events at any time to update the schema structure.

The biggest problem with this approach may be that it could take a long time to recover the historical schema snapshot. Perhaps there is a better solution available?

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@wallkop wallkop added the bug Something isn't working label Mar 6, 2023
@wallkop wallkop changed the title [Bug] Flink-CDC 2.3.0基于SPECIFIC_OFFSETS消费数据,在起始偏移量之后更改了表结构将无法正常消费 [Bug] Flink-CDC 2.3.0 consumes data based on SPECIFIC_OFFSETS. If the table structure is changed after the starting offset, it will not be able to consume the data correctly. Mar 6, 2023
@ruanhang1993
Copy link
Contributor

This is an expected behavior, which has been written in the mysql cdc connector doc.

If schema of capturing tables was changed previously, starting with earliest offset, specific offset or timestamp
could fail as the Debezium reader keeps the current latest table schema internally and earlier records with unmatched schema cannot be correctly parsed.

@ruanhang1993
Copy link
Contributor

#1724 is the PR about this part.

@wallkop wallkop changed the title [Bug] Flink-CDC 2.3.0 consumes data based on SPECIFIC_OFFSETS. If the table structure is changed after the starting offset, it will not be able to consume the data correctly. Flink-CDC 2.3.0 consumes data based on SPECIFIC_OFFSETS. If the table structure is changed after the starting offset, it will not be able to consume the data correctly. Mar 8, 2023
@HoboLegion
Copy link

flink 1.14.5
flinkcdc 2.3.0
可以稳定复现 ,基于位点重启,如果位点之后存在ddl语句,eg:新加一列,回出现上述问题。

`2023-03-14 11:19:41,676 WARN com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader - Failed to close the binlog split reader in 30 seconds.
2023-03-14 11:19:41,676 WARN org.apache.flink.runtime.taskmanager.Task - Source: MySQLSource -> Sink: Print to Std. Out (5/12)#1 (eaed28dabed1ff239d50be1f7fd732c7) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:156)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:354)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received unexpected exception while polling the records
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.connect.errors.ConnectException: An exception occurred in the change event producer. This connector will be stopped.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:42)
at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:79)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:422)
at com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask.handleEvent(MySqlBinlogSplitReadTask.java:88)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$25(MySqlStreamingChangeEventSource.java:1017)
at com.github.shyiko.mysql.binlog.BinaryLogClient.notifyEventListeners(BinaryLogClient.java:1125)
at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:973)
at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:599)
at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:857)
... 1 more
Caused by: com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException: Internal schema representation is probably out of sync with real database schema. The reason could be that the table schema was changed after the starting binlog offset, which is not supported when startup mode is set to TIMESTAMP
at com.ververica.cdc.connectors.mysql.debezium.task.context.MySqlErrorHandler.setProducerThrowable(MySqlErrorHandler.java:84)
... 8 more
Caused by: io.debezium.DebeziumException: Error processing binlog event
... 8 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Error while processing event at offset {transaction_id=null, ts_sec=1678758780, file=, pos=378512, row=1, server_id=1, event=2}
at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:253)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$handleUpdate$6(MySqlStreamingChangeEventSource.java:817)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleChange(MySqlStreamingChangeEventSource.java:878)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleUpdate(MySqlStreamingChangeEventSource.java:809)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.lambda$execute$18(MySqlStreamingChangeEventSource.java:989)
at io.debezium.connector.mysql.MySqlStreamingChangeEventSource.handleEvent(MySqlStreamingChangeEventSource.java:404)
... 7 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Data row is smaller than a column index, internal schema representation is probably out of sync with real database schema
at io.debezium.relational.TableSchemaBuilder.validateIncomingRowToInternalMetadata(TableSchemaBuilder.java:221)
at io.debezium.relational.TableSchemaBuilder.lambda$createValueGenerator$5(TableSchemaBuilder.java:250)
at io.debezium.relational.TableSchema.valueFromColumnData(TableSchema.java:141)
at io.debezium.relational.RelationalChangeRecordEmitter.emitUpdateRecord(RelationalChangeRecordEmitter.java:100)
at io.debezium.relational.RelationalChangeRecordEmitter.emitChangeRecords(RelationalChangeRecordEmitter.java:51)
at io.debezium.pipeline.EventDispatcher.dispatchDataChangeEvent(EventDispatcher.java:217)
... 12 more

Process finished with exit code -1
`

@wallkop
Copy link
Contributor Author

wallkop commented Mar 14, 2023

This is an expected behavior, which has been written in the mysql cdc connector doc.

If schema of capturing tables was changed previously, starting with earliest offset, specific offset or timestamp
could fail as the Debezium reader keeps the current latest table schema internally and earlier records with unmatched schema cannot be correctly parsed.

Thank you very much, @ruanhang1993. I understand your point, so I removed the bug label, but I did not close this issue. I believe this should be an enhancement. In the Flink CDC DingTalk community, multiple users have also reported this issue. LeonardBang suggested creating an issue to record and attempt to resolve it.

Hello, @leonardBang, is there a plan to solve this issue?

@wallkop wallkop closed this as completed Apr 3, 2023
@ltylty
Copy link

ltylty commented May 31, 2023

is this fixed? same problem with me.

@maikouliujian
Copy link

is this fixed? same problem with me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

5 participants