Skip to content

Commit

Permalink
[mysql] Improve error message under the case that start reading from …
Browse files Browse the repository at this point in the history
…earliest but schema change happened before (#1724)
  • Loading branch information
PatrickRen committed Nov 9, 2022
1 parent 125ff23 commit 5ae25c8
Show file tree
Hide file tree
Showing 6 changed files with 138 additions and 17 deletions.
4 changes: 3 additions & 1 deletion docs/content/connectors/mysql-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,10 @@ CREATE TABLE mysql_source (...) WITH (
)
```

**注意**MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on checkpoint {checkpoint-id}"
**注意**
1. MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on checkpoint {checkpoint-id}"
该日志可以帮助将作业从某个 checkpoint 的位点开始启动的场景。
2. 如果捕获变更的表曾经发生过表结构变化,从最早位点、特定位点或时间戳启动可能会发生错误,因为 Debezium 读取器会在内部保存当前的最新表结构,结构不匹配的早期数据无法被正确解析。


### DataStream Source
Expand Down
5 changes: 4 additions & 1 deletion docs/content/connectors/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,11 @@ CREATE TABLE mysql_source (...) WITH (
)
```

**Note:** MySQL source will print the current binlog position into logs with INFO level on checkpoint, with the prefix
**Notes:**
1. MySQL source will print the current binlog position into logs with INFO level on checkpoint, with the prefix
"Binlog offset on checkpoint {checkpoint-id}". It could be useful if you want to restart the job from a specific checkpointed position.
2. If schema of capturing tables was changed previously, starting with earliest offset, specific offset or timestamp
could fail as the Debezium reader keeps the current latest table schema internally and earlier records with unmatched schema cannot be correctly parsed.

### DataStream Source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

package com.ververica.cdc.connectors.mysql.debezium.task.context;

import com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.table.StartupMode;
import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.relational.TableId;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,12 +43,17 @@ public class MySqlErrorHandler extends ErrorHandler {
Pattern.compile(
"Encountered change event for table (.+)\\.(.+) whose schema isn't known to this connector");

MySqlTaskContext context;
private final MySqlTaskContext context;
private final MySqlSourceConfig sourceConfig;

public MySqlErrorHandler(
String logicalName, ChangeEventQueue<?> queue, MySqlTaskContext context) {
String logicalName,
ChangeEventQueue<?> queue,
MySqlTaskContext context,
MySqlSourceConfig sourceConfig) {
super(MySqlConnector.class, logicalName, queue);
this.context = context;
this.sourceConfig = sourceConfig;
}

@Override
Expand All @@ -53,20 +63,56 @@ protected boolean isRetriable(Throwable throwable) {

@Override
public void setProducerThrowable(Throwable producerThrowable) {
if (producerThrowable.getCause() instanceof DebeziumException) {
DebeziumException e = (DebeziumException) producerThrowable.getCause();
String detailMessage = e.getMessage();
Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
if (matcher.find()) {
String databaseName = matcher.group(1);
String tableName = matcher.group(2);
TableId tableId = new TableId(databaseName, null, tableName);
if (context.getSchema().schemaFor(tableId) == null) {
LOG.warn("Schema for table " + tableId + " is null");
return;
}
if (isTableNotFoundException(producerThrowable)) {
Matcher matcher =
NOT_FOUND_TABLE_MSG_PATTERN.matcher(producerThrowable.getCause().getMessage());
String databaseName = matcher.group(1);
String tableName = matcher.group(2);
TableId tableId = new TableId(databaseName, null, tableName);
if (context.getSchema().schemaFor(tableId) == null) {
LOG.warn("Schema for table " + tableId + " is null");
return;
}
}

if (isSchemaOutOfSyncException(producerThrowable)) {
super.setProducerThrowable(
new SchemaOutOfSyncException(
"Internal schema representation is probably out of sync with real database schema. "
+ "The reason could be that the table schema was changed after the starting "
+ "binlog offset, which is not supported when startup mode is set to "
+ sourceConfig.getStartupOptions().startupMode,
producerThrowable));
return;
}

super.setProducerThrowable(producerThrowable);
}

private boolean isTableNotFoundException(Throwable t) {
if (!(t.getCause() instanceof DebeziumException)) {
return false;
}
DebeziumException e = (DebeziumException) t.getCause();
String detailMessage = e.getMessage();
Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
return matcher.find();
}

private boolean isSchemaOutOfSyncException(Throwable t) {
Throwable rootCause = ExceptionUtils.getRootCause(t);
return rootCause instanceof ConnectException
&& rootCause
.getMessage()
.endsWith(
"internal schema representation is probably out of sync with real database schema")
&& isSettingStartingOffset();
}

private boolean isSettingStartingOffset() {
StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
return startupMode == StartupMode.EARLIEST_OFFSET
|| startupMode == StartupMode.TIMESTAMP
|| startupMode == StartupMode.SPECIFIC_OFFSETS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ public void configure(MySqlSplit mySqlSplit) {
changeEventSourceMetricsFactory.getStreamingMetrics(
taskContext, queue, metadataProvider);
this.errorHandler =
new MySqlErrorHandler(connectorConfig.getLogicalName(), queue, taskContext);
new MySqlErrorHandler(
connectorConfig.getLogicalName(), queue, taskContext, sourceConfig);
}

private void validateAndLoadDatabaseHistory(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.connectors.mysql.debezium.task.context.exception;

/**
* A wrapper class for clearly show the possible reason of a schema-out-of-sync exception thrown
* inside Debezium.
*/
public class SchemaOutOfSyncException extends Exception {
public SchemaOutOfSyncException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.ExceptionUtils;

import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.debezium.dispatcher.SignalEventDispatcher;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceTestBase;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlBinlogSplitAssigner;
import com.ververica.cdc.connectors.mysql.source.assigners.MySqlSnapshotSplitAssigner;
Expand Down Expand Up @@ -65,7 +67,9 @@

import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSnapshotSplitInfo;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getStartingOffsetOfBinlogSplit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;

/** Tests for {@link BinlogSplitReader}. */
Expand Down Expand Up @@ -363,6 +367,43 @@ public void testReadBinlogFromEarliestOffset() throws Exception {
assertEqualsInOrder(Arrays.asList(expected), actual);
}

@Test
public void testReadBinlogFromEarliestOffsetAfterSchemaChange() throws Exception {
customerDatabase.createAndInitialize();
MySqlSourceConfig sourceConfig =
getConfig(StartupOptions.earliest(), new String[] {"customers"});
binaryLogClient = DebeziumUtils.createBinaryClient(sourceConfig.getDbzConfiguration());
mySqlConnection = DebeziumUtils.createMySqlConnection(sourceConfig);
String tableId = customerDatabase.qualifiedTableName("customers");
DataType dataType =
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.BIGINT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("address", DataTypes.STRING()),
DataTypes.FIELD("phone_number", DataTypes.STRING()));

// Add a column to the table
addColumnToTable(mySqlConnection, tableId);

// Create reader and submit splits
MySqlBinlogSplit split = createBinlogSplit(sourceConfig);
BinlogSplitReader reader = createBinlogReader(sourceConfig);
reader.submitSplit(split);

// An exception is expected here because the table schema is changed, which is not allowed
// under earliest startup mode.
Throwable throwable =
assertThrows(Throwable.class, () -> readBinlogSplits(dataType, reader, 1));
Optional<SchemaOutOfSyncException> schemaOutOfSyncException =
ExceptionUtils.findThrowable(throwable, SchemaOutOfSyncException.class);
assertTrue(schemaOutOfSyncException.isPresent());
assertEquals(
"Internal schema representation is probably out of sync with real database schema. "
+ "The reason could be that the table schema was changed after the starting "
+ "binlog offset, which is not supported when startup mode is set to EARLIEST_OFFSET",
schemaOutOfSyncException.get().getMessage());
}

@Test
public void testReadBinlogFromBinlogFilePosition() throws Exception {
// Preparations
Expand Down Expand Up @@ -1002,5 +1043,6 @@ private MySqlSourceConfigFactory getConfigFactory(String[] captureTables) {
private void addColumnToTable(JdbcConnection connection, String tableId) throws Exception {
connection.execute(
"ALTER TABLE " + tableId + " ADD COLUMN new_int_column INT DEFAULT 15213");
connection.commit();
}
}

0 comments on commit 5ae25c8

Please sign in to comment.