Skip to content

Commit

Permalink
[ISSUE #5052] Enhancement for source\sink connector (#5066)
Browse files Browse the repository at this point in the history
* [ISSUE #5040] Support gtid mode for sync data with mysql

* fix conflicts with master

* fix checkstyle error

* [ISSUE #5044] Data synchronization strong verification in mariadb gtid mode

* fix checkstyle error

* [ISSUE #5048] Add report verify request to admin for connector runtime

* fix checkstyle error

* [ISSUE #5052] Enhancement for source\sink connector

* fix checkstyle error

* fix checkstyle error
  • Loading branch information
xwm1992 authored Aug 1, 2024
1 parent c1dd6b0 commit 2ba54c7
Show file tree
Hide file tree
Showing 52 changed files with 344 additions and 77 deletions.
52 changes: 18 additions & 34 deletions eventmesh-admin-server/conf/eventmesh.sql
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@
/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */;


-- 导出 eventmesh 的数据库结构
-- export eventmesh database
CREATE DATABASE IF NOT EXISTS `eventmesh` /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */ /*!80016 DEFAULT ENCRYPTION='N' */;
USE `eventmesh`;

-- 导出 表 eventmesh.event_mesh_data_source 结构
-- export table eventmesh.event_mesh_data_source structure
CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`dataType` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
Expand All @@ -39,11 +39,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_data_source` (
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 导出 表 eventmesh.event_mesh_job_info 结构
-- export table eventmesh.event_mesh_job_info structure
CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
Expand All @@ -61,11 +59,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_job_info` (
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `jobID` (`jobID`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 导出 表 eventmesh.event_mesh_mysql_position 结构
-- export table eventmesh.event_mesh_mysql_position structure
CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`jobID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
Expand All @@ -80,11 +76,9 @@ CREATE TABLE IF NOT EXISTS `event_mesh_mysql_position` (
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `jobID` (`jobID`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC;

-- 导出 表 eventmesh.event_mesh_position_reporter_history 结构
-- export table eventmesh.event_mesh_position_reporter_history structure
CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` (
`id` bigint NOT NULL AUTO_INCREMENT,
`job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
Expand All @@ -94,57 +88,49 @@ CREATE TABLE IF NOT EXISTS `event_mesh_position_reporter_history` (
PRIMARY KEY (`id`),
KEY `job` (`job`),
KEY `address` (`address`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='记录position上报者变更时,老记录';

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='record position reporter changes';

-- 导出 表 eventmesh.event_mesh_runtime_heartbeat 结构
-- export table eventmesh.event_mesh_runtime_heartbeat structure
CREATE TABLE IF NOT EXISTS `event_mesh_runtime_heartbeat` (
`id` bigint unsigned NOT NULL AUTO_INCREMENT,
`adminAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`runtimeAddr` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`jobID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
`reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime本地上报时间',
`reportTime` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL COMMENT 'runtime local report time',
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `runtimeAddr` (`runtimeAddr`),
KEY `jobID` (`jobID`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 导出 表 eventmesh.event_mesh_runtime_history 结构
-- export table eventmesh.event_mesh_runtime_history structure
CREATE TABLE IF NOT EXISTS `event_mesh_runtime_history` (
`id` bigint NOT NULL AUTO_INCREMENT,
`job` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`address` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `address` (`address`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='记录runtime上运行任务的变更';

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci ROW_FORMAT=DYNAMIC COMMENT='record runtime task change history';

-- 导出 表 eventmesh.event_mesh_task_info 结构
-- export table eventmesh.event_mesh_task_info structure
CREATE TABLE IF NOT EXISTS `event_mesh_task_info` (
`id` int unsigned NOT NULL AUTO_INCREMENT,
`taskID` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`name` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci NOT NULL,
`desc` varchar(50) COLLATE utf8mb4_general_ci NOT NULL,
`state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'TaskState',
`state` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '' COMMENT 'taskstate',
`fromRegion` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`createUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`updateUid` varchar(50) COLLATE utf8mb4_general_ci NOT NULL DEFAULT '',
`createTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`updateTime` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`id`) USING BTREE,
UNIQUE KEY `taskID` (`taskID`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 数据导出被取消选择。
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 导出 表 eventmesh.event_mesh_verify 结构
-- export table eventmesh.event_mesh_verify structure
CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
`id` int NOT NULL,
`taskID` varchar(50) COLLATE utf8mb4_general_ci DEFAULT NULL,
Expand All @@ -157,8 +143,6 @@ CREATE TABLE IF NOT EXISTS `event_mesh_verify` (
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

-- 数据导出被取消选择。

/*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */;
/*!40014 SET FOREIGN_KEY_CHECKS=IFNULL(@OLD_FOREIGN_KEY_CHECKS, 1) */;
/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@

/**
* for table 'event_mesh_job_info' db operation
* 2024-05-09 15:51:45
*/
@Service
@Slf4j
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
import org.apache.eventmesh.openconnect.api.connector.ConnectorContext;
import org.apache.eventmesh.openconnect.api.connector.SinkConnectorContext;
import org.apache.eventmesh.openconnect.api.sink.Sink;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendExceptionContext;
import org.apache.eventmesh.openconnect.offsetmgmt.api.callback.SendResult;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -146,6 +148,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
executor.shutdown();
Expand All @@ -159,7 +166,7 @@ public void put(List<ConnectRecord> sinkRecords) {
List<CanalConnectRecord> canalConnectRecordList = (List<CanalConnectRecord>) connectRecord.getData();
canalConnectRecordList = filterRecord(canalConnectRecordList);
if (isDdlDatas(canalConnectRecordList)) {
doDdl(context, canalConnectRecordList);
doDdl(context, canalConnectRecordList, connectRecord);
} else if (sinkConfig.isGTIDMode()) {
doLoadWithGtid(context, sinkConfig, connectRecord);
} else {
Expand Down Expand Up @@ -197,7 +204,7 @@ private List<CanalConnectRecord> filterRecord(List<CanalConnectRecord> canalConn
.collect(Collectors.toList());
}

private void doDdl(DbLoadContext context, List<CanalConnectRecord> canalConnectRecordList) {
private void doDdl(DbLoadContext context, List<CanalConnectRecord> canalConnectRecordList, ConnectRecord connectRecord) {
for (final CanalConnectRecord record : canalConnectRecordList) {
try {
Boolean result = jdbcTemplate.execute(new StatementCallback<Boolean>() {
Expand All @@ -217,9 +224,30 @@ public Boolean doInStatement(Statement stmt) throws SQLException, DataAccessExce
context.getFailedRecords().add(record);
}
} catch (Throwable e) {
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, e));
throw new RuntimeException(e);
}
}
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}

private SendExceptionContext buildSendExceptionContext(ConnectRecord record, Throwable e) {
SendExceptionContext sendExceptionContext = new SendExceptionContext();
sendExceptionContext.setMessageId(record.getRecordId());
sendExceptionContext.setCause(e);
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
sendExceptionContext.setTopic(record.getExtension("topic"));
}
return sendExceptionContext;
}

private SendResult convertToSendResult(ConnectRecord record) {
SendResult result = new SendResult();
result.setMessageId(record.getRecordId());
if (org.apache.commons.lang3.StringUtils.isNotEmpty(record.getExtension("topic"))) {
result.setTopic(record.getExtension("topic"));
}
return result;
}

private void doBefore(List<CanalConnectRecord> canalConnectRecordList, final DbLoadData loadData) {
Expand Down Expand Up @@ -291,21 +319,26 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C
Exception ex = null;
try {
ex = result.get();
if (ex == null) {
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
} catch (Exception e) {
ex = e;
}
Boolean skipException = sinkConfig.getSkipException();
if (skipException != null && skipException) {
if (ex != null) {
// do skip
log.warn("skip exception for data : {} , caused by {}",
log.warn("skip exception will ack data : {} , caused by {}",
filteredRows,
ExceptionUtils.getFullStackTrace(ex));
GtidBatchManager.removeGtidBatch(gtid);
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
} else {
if (ex != null) {
log.error("sink connector will shutdown by " + ex.getMessage(), ExceptionUtils.getFullStackTrace(ex));
connectRecord.getCallback().onException(buildSendExceptionContext(connectRecord, ex));
gtidSingleExecutor.shutdown();
System.exit(1);
} else {
Expand All @@ -314,6 +347,8 @@ private void doLoadWithGtid(DbLoadContext context, CanalSinkConfig sinkConfig, C
}
} else {
log.info("Batch received, waiting for other batches.");
// ack this record
connectRecord.getCallback().onSuccess(convertToSendResult(connectRecord));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public String name() {
return null;
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void put(List<ConnectRecord> sinkRecords) {
if (sinkRecords == null || sinkRecords.isEmpty() || sinkRecords.get(0) == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,11 @@ public String name() {
return this.sourceConfig.getSourceConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
if (!running) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,11 @@ public String name() {
return this.config.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public List<ConnectRecord> poll() {
while (flag.get()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
Throwable t = this.server.close().cause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
isRunning = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
outputStream.flush();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,11 @@ public String name() {
return this.httpSinkConfig.connectorConfig.getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() throws Exception {
this.sinkHandler.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,11 @@ public String name() {
return this.sourceConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
if (this.server != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ public String name() {
return this.sinkConfig.getSinkConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

/**
* Stops the Connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,11 @@ public String name() {
return "JDBC Source Connector";
}

@Override
public void onException(ConnectRecord record) {

}

/**
* Stops the Connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ public String name() {
return this.sinkConfig.getConnectorConfig().getConnectorName();
}

@Override
public void onException(ConnectRecord record) {

}

@Override
public void stop() {
producer.close();
Expand Down
Loading

0 comments on commit 2ba54c7

Please sign in to comment.