Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BP-2.3][mysql] Use event filter to drop events earlier than the specified timestamp in timestamp startup mode #1726

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/content/connectors/mysql-cdc(ZH).md
Original file line number Diff line number Diff line change
Expand Up @@ -589,8 +589,10 @@ CREATE TABLE mysql_source (...) WITH (
)
```

**注意**:MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on checkpoint {checkpoint-id}"。
**注意**:
1. MySQL source 会在 checkpoint 时将当前位点以 INFO 级别打印到日志中,日志前缀为 "Binlog offset on checkpoint {checkpoint-id}"。
该日志可以帮助将作业从某个 checkpoint 的位点开始启动的场景。
2. 如果捕获变更的表曾经发生过表结构变化,从最早位点、特定位点或时间戳启动可能会发生错误,因为 Debezium 读取器会在内部保存当前的最新表结构,结构不匹配的早期数据无法被正确解析。


### DataStream Source
Expand Down
5 changes: 4 additions & 1 deletion docs/content/connectors/mysql-cdc.md
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,11 @@ CREATE TABLE mysql_source (...) WITH (
)
```

**Note:** MySQL source will print the current binlog position into logs with INFO level on checkpoint, with the prefix
**Notes:**
1. MySQL source will print the current binlog position into logs with INFO level on checkpoint, with the prefix
"Binlog offset on checkpoint {checkpoint-id}". It could be useful if you want to restart the job from a specific checkpointed position.
2. If schema of capturing tables was changed previously, starting with earliest offset, specific offset or timestamp
could fail as the Debezium reader keeps the current latest table schema internally and earlier records with unmatched schema cannot be correctly parsed.

### DataStream Source

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,12 @@

import org.apache.flink.shaded.guava30.com.google.common.util.concurrent.ThreadFactoryBuilder;

import com.github.shyiko.mysql.binlog.event.Event;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.ververica.cdc.connectors.mysql.debezium.task.MySqlBinlogSplitReadTask;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset;
import com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetKind;
import com.ververica.cdc.connectors.mysql.source.split.FinishedSnapshotSplitInfo;
import com.ververica.cdc.connectors.mysql.source.split.MySqlBinlogSplit;
import com.ververica.cdc.connectors.mysql.source.split.MySqlSplit;
Expand Down Expand Up @@ -54,6 +57,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.Predicate;

import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getSplitKey;
Expand Down Expand Up @@ -111,7 +115,8 @@ public void submitSplit(MySqlSplit mySqlSplit) {
statefulTaskContext.getTaskContext(),
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
currentBinlogSplit);
currentBinlogSplit,
createEventFilter(currentBinlogSplit.getStartingOffset()));

executorService.submit(
() -> {
Expand Down Expand Up @@ -299,6 +304,21 @@ private void configureFilter() {
this.pureBinlogPhaseTables.clear();
}

private Predicate<Event> createEventFilter(BinlogOffset startingOffset) {
// If the startup mode is set as TIMESTAMP, we need to apply a filter on event to drop
// events earlier than the specified timestamp.
if (BinlogOffsetKind.TIMESTAMP.equals(startingOffset.getOffsetKind())) {
long startTimestampSec = startingOffset.getTimestampSec();
// Notes:
// 1. Heartbeat event doesn't contain timestamp, so we just keep it
// 2. Timestamp of event is in epoch millisecond
return event ->
EventType.HEARTBEAT.equals(event.getHeader().getEventType())
|| event.getHeader().getTimestamp() >= startTimestampSec * 1000;
}
return event -> true;
}

public void stopBinlogReadTask() {
this.currentTaskRunning = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ private MySqlBinlogSplitReadTask createBackfillBinlogReadTask(
statefulTaskContext.getTaskContext(),
(MySqlStreamingChangeEventSourceMetrics)
statefulTaskContext.getStreamingChangeEventSourceMetrics(),
backfillBinlogSplit);
backfillBinlogSplit,
event -> true);
}

private void dispatchBinlogEndEvent(MySqlBinlogSplit backFillBinlogSplit)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.Predicate;

import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.isNonStoppingOffset;
import static com.ververica.cdc.connectors.mysql.source.utils.RecordUtils.getBinlogPosition;

Expand All @@ -49,6 +51,7 @@ public class MySqlBinlogSplitReadTask extends MySqlStreamingChangeEventSource {
private final EventDispatcherImpl<TableId> eventDispatcher;
private final SignalEventDispatcher signalEventDispatcher;
private final ErrorHandler errorHandler;
private final Predicate<Event> eventFilter;
private ChangeEventSourceContext context;

public MySqlBinlogSplitReadTask(
Expand All @@ -60,12 +63,14 @@ public MySqlBinlogSplitReadTask(
Clock clock,
MySqlTaskContext taskContext,
MySqlStreamingChangeEventSourceMetrics metrics,
MySqlBinlogSplit binlogSplit) {
MySqlBinlogSplit binlogSplit,
Predicate<Event> eventFilter) {
super(connectorConfig, connection, dispatcher, errorHandler, clock, taskContext, metrics);
this.binlogSplit = binlogSplit;
this.eventDispatcher = dispatcher;
this.errorHandler = errorHandler;
this.signalEventDispatcher = signalEventDispatcher;
this.eventFilter = eventFilter;
}

@Override
Expand All @@ -77,6 +82,9 @@ public void execute(ChangeEventSourceContext context, MySqlOffsetContext offsetC

@Override
protected void handleEvent(MySqlOffsetContext offsetContext, Event event) {
if (!eventFilter.test(event)) {
return;
}
super.handleEvent(offsetContext, event);
// check do we need to stop for read binlog for snapshot split.
if (isBoundedRead()) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,17 @@

package com.ververica.cdc.connectors.mysql.debezium.task.context;

import com.ververica.cdc.connectors.mysql.debezium.task.context.exception.SchemaOutOfSyncException;
import com.ververica.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import com.ververica.cdc.connectors.mysql.table.StartupMode;
import io.debezium.DebeziumException;
import io.debezium.connector.base.ChangeEventQueue;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlTaskContext;
import io.debezium.pipeline.ErrorHandler;
import io.debezium.relational.TableId;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -38,12 +43,17 @@ public class MySqlErrorHandler extends ErrorHandler {
Pattern.compile(
"Encountered change event for table (.+)\\.(.+) whose schema isn't known to this connector");

MySqlTaskContext context;
private final MySqlTaskContext context;
private final MySqlSourceConfig sourceConfig;

public MySqlErrorHandler(
String logicalName, ChangeEventQueue<?> queue, MySqlTaskContext context) {
String logicalName,
ChangeEventQueue<?> queue,
MySqlTaskContext context,
MySqlSourceConfig sourceConfig) {
super(MySqlConnector.class, logicalName, queue);
this.context = context;
this.sourceConfig = sourceConfig;
}

@Override
Expand All @@ -53,20 +63,56 @@ protected boolean isRetriable(Throwable throwable) {

@Override
public void setProducerThrowable(Throwable producerThrowable) {
if (producerThrowable.getCause() instanceof DebeziumException) {
DebeziumException e = (DebeziumException) producerThrowable.getCause();
String detailMessage = e.getMessage();
Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
if (matcher.find()) {
String databaseName = matcher.group(1);
String tableName = matcher.group(2);
TableId tableId = new TableId(databaseName, null, tableName);
if (context.getSchema().schemaFor(tableId) == null) {
LOG.warn("Schema for table " + tableId + " is null");
return;
}
if (isTableNotFoundException(producerThrowable)) {
Matcher matcher =
NOT_FOUND_TABLE_MSG_PATTERN.matcher(producerThrowable.getCause().getMessage());
String databaseName = matcher.group(1);
String tableName = matcher.group(2);
TableId tableId = new TableId(databaseName, null, tableName);
if (context.getSchema().schemaFor(tableId) == null) {
LOG.warn("Schema for table " + tableId + " is null");
return;
}
}

if (isSchemaOutOfSyncException(producerThrowable)) {
super.setProducerThrowable(
new SchemaOutOfSyncException(
"Internal schema representation is probably out of sync with real database schema. "
+ "The reason could be that the table schema was changed after the starting "
+ "binlog offset, which is not supported when startup mode is set to "
+ sourceConfig.getStartupOptions().startupMode,
producerThrowable));
return;
}

super.setProducerThrowable(producerThrowable);
}

private boolean isTableNotFoundException(Throwable t) {
if (!(t.getCause() instanceof DebeziumException)) {
return false;
}
DebeziumException e = (DebeziumException) t.getCause();
String detailMessage = e.getMessage();
Matcher matcher = NOT_FOUND_TABLE_MSG_PATTERN.matcher(detailMessage);
return matcher.find();
}

private boolean isSchemaOutOfSyncException(Throwable t) {
Throwable rootCause = ExceptionUtils.getRootCause(t);
return rootCause instanceof ConnectException
&& rootCause
.getMessage()
.endsWith(
"internal schema representation is probably out of sync with real database schema")
&& isSettingStartingOffset();
}

private boolean isSettingStartingOffset() {
StartupMode startupMode = sourceConfig.getStartupOptions().startupMode;
return startupMode == StartupMode.EARLIEST_OFFSET
|| startupMode == StartupMode.TIMESTAMP
|| startupMode == StartupMode.SPECIFIC_OFFSETS;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import java.util.List;
import java.util.Map;

import static com.ververica.cdc.connectors.mysql.debezium.task.context.ChangeEventCreatorFactory.createChangeEventCreator;
import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffset.BINLOG_FILENAME_OFFSET_KEY;
import static com.ververica.cdc.connectors.mysql.source.offset.BinlogOffsetUtils.initializeEffectiveOffset;

Expand Down Expand Up @@ -145,7 +144,7 @@ public void configure(MySqlSplit mySqlSplit) {
databaseSchema,
queue,
connectorConfig.getTableFilters().dataCollectionFilter(),
createChangeEventCreator(mySqlSplit),
DataChangeEvent::new,
metadataProvider,
schemaNameAdjuster);

Expand All @@ -166,7 +165,8 @@ public void configure(MySqlSplit mySqlSplit) {
changeEventSourceMetricsFactory.getStreamingMetrics(
taskContext, queue, metadataProvider);
this.errorHandler =
new MySqlErrorHandler(connectorConfig.getLogicalName(), queue, taskContext);
new MySqlErrorHandler(
connectorConfig.getLogicalName(), queue, taskContext, sourceConfig);
}

private void validateAndLoadDatabaseHistory(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2022 Ververica Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.ververica.cdc.connectors.mysql.debezium.task.context.exception;

/**
* A wrapper class for clearly show the possible reason of a schema-out-of-sync exception thrown
* inside Debezium.
*/
public class SchemaOutOfSyncException extends Exception {
public SchemaOutOfSyncException(String message, Throwable cause) {
super(message, cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.ververica.cdc.connectors.mysql.source.offset;

import com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils;
import com.ververica.cdc.connectors.mysql.debezium.task.context.ChangeEventCreatorFactory;
import com.ververica.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
import io.debezium.connector.mysql.MySqlConnection;

Expand All @@ -39,8 +38,7 @@ public class BinlogOffsetUtils {
* <ul>
* <li>EARLIEST: binlog filename = "", position = 0
* <li>TIMESTAMP: set to earliest, as the current implementation is reading from the earliest
* offset and drop events earlier than the specified timestamp. See {@link
* ChangeEventCreatorFactory#createChangeEventCreator}.
* offset and drop events earlier than the specified timestamp.
* <li>LATEST: fetch the current binlog by JDBC
* </ul>
*/
Expand Down
Loading