diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java index ff0c9d7946..b290d7f6af 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/sink/connector/CanalSinkConnector.java @@ -278,7 +278,7 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C List> totalRows = batch.getBatches(); List filteredRows = new ArrayList<>(); for (List canalConnectRecords : totalRows) { - canalConnectRecords = filterRecord(canalConnectRecords, sinkConfig); + canalConnectRecords = filterRecord(canalConnectRecords); if (!CollectionUtils.isEmpty(canalConnectRecords)) { for (final CanalConnectRecord record : canalConnectRecords) { boolean filter = interceptor.before(sinkConfig, record); diff --git a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java index bf2a8af27a..6cd575cb77 100644 --- a/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java +++ b/eventmesh-connectors/eventmesh-connector-canal/src/main/java/org/apache/eventmesh/connector/canal/source/connector/CanalSourceConnector.java @@ -150,6 +150,8 @@ protected void startEventParserInternal(CanalEventParser parser, boolean isGroup return instance; } }); + DatabaseConnection.sourceConfig = sourceConfig.getSourceConnectorConfig(); + DatabaseConnection.initSourceConnection(); tableMgr = new RdbTableMgr(sourceConfig.getSourceConnectorConfig(), DatabaseConnection.sourceDataSource); }