-
Notifications
You must be signed in to change notification settings - Fork 413
2.1_MySql Reader
Mysql-Reader插件用于Mysql数据库增量日志解析和数据模型转换,适用于需要实时同步mysql数据的场景,例如:EDA、CQRS、BigData、数据库升级、迁移与备份,以及业务缓存刷新等。
如上图所示,Mysql-Reader内部处理流程主要分为两个阶段:
- 获取Binlog。MysqlReader集成了阿里开源的Canal组件来采集binlog,CanalServer的生命周期和Reader保持一致,CanalServer启动之后会模拟mysql slave向mysql master发送dump请求,mysql master收到dump请求之后会向canal推送Binlog,Canal内部将binlog事件解析为内部的Entry,MysqlReader通过CanalServer提供的api获取这些Entries。
- 数据转换。该阶段主要对Entries进行模型转换,由MessageParser将canal组装好的Entry对象转换为自定义的RdbEventRecord对象。
Mysql-Reader的具体工作原理分如下三个阶段介绍:
-
【启动canal】
> build canalFilter
根据源端要同步的库名和表名生成canal的过滤表达式,过滤不需要同步的库和表。
> generate CanalInstance
首先进行组装CanalConfig、创建AlarmHandler、动态生成slaveId等准备工作。其中,组装CanalConfig主要从MysqlReaderParameter的参数配置中获取,包括组装源端数据库、配置起始位点、内存缓冲区大小、HA模式(限定为HEARTBEAT)、心跳检测SQL和频率、黑名单等。
然后,创建具体的CanalInstance,主要完成以下工作:
(1)初始化MetaManager,设置位点管理器和canal过滤器;
(2)启动EventParser,设置支持的Mysql的Binlog类型和格式;
(3)初始化EventSink,设置数据处理的模式。根据源端master数据库的个数设置EventSink:当master数据库个数>1且GroupSinkMode为Coordinate时,将EventSink初始化为FixedGroupEventSink,支持事务保留,其他情况下将EventSink初始化为EntryEventSink。
> start canalServer
启动canal服务器,并从相应位点发起客户端消费订阅,同时订阅filter的变化。这里的MysqlReader相当于一个canal的消费端。
注: 默认使用MetaManager中记录的最后一次消费位点,若为空,则使用canal实例存储的第一个位点。 -
【获取数据】
> get Message withoutAck
canal服务器在检查确认消费端(此处即为Task)已启动并发起订阅之后,才会拿到该客户端的canal实例进行数据获取。首先判断该实例的上批次数据是否仍存在,若存在,则继续使用上批次events,不再获取新数据,若不存在,则根据上批次的batchId来获取本批次events。然后将获取到的events转换为entrys,并把该批次的batchId和entrys封装为Message返回给Mysql-Reader。随后,Mysql-Reader再对Message数据做具体的处理,包括计算该批次数据对应的binlog日志大小、dump数据详情、将entrys解析为RdbEventRecord等。
> batchTimeout
Mysql-Reader利用canal服务器获取数据之前,会首先判断Task是否启动了获取批量数据的超时时间控制(batchTimeout),默认不进行超时控制,当无数据时,进行轮询处理。若进行超时控制,当无数据时,则按照超时时间处理。
注:轮询处理时,为避免空循环机器挂死,允许最多重试3次,超过3次,最多sleep10毫秒。
-
【解析数据】
> do filter
canal返回的entrys首先需要处理数据过滤,主要包括Transaction Begin/End过滤、回环表以及回环数据过滤、canal心跳表数据过滤。
> parse Entry
首先将Entry中的数据转换成RowChange,得到其事件类型EventType,然后根据不同的EventType对Entry进行处理。对于Query事件类型的Entry,直接过滤;对于ddl事件类型的Entry,若Mysql-Reader开启了同步ddl开关,则将其转换为RdbEventRecord对象,若未开启则直接忽略;对于dml事件类型的Entry,可以从rowData的beforeColumns和afterColumns中得到所有主键和非主键列变更前和变更后的数据,并按照EventColumn的index进行排序,最后将其转换为RdbEventRecord对象。
-
【HA机制】
> canal的ha分为两部分,canal server和canal client分别有对应的ha实现:
canal server:为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态。
canal client:为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。> mysql的ha通过配置多个读IP实现,当源端数据库出现异常时,能够自动切换到健康的DB。
-
【过滤机制】
> 过滤Binlog:过滤不需要同步的数据库、数据表、事务头、事务尾、回环表、心跳表、以及Query类型的数据等,只保留需要同步的Binlog。
> 过滤事件:可以设置需要过滤的特定事件类型的Record(INSERT/UPDATE/DELETE,可多选)。
-
【dump数据详情】
> Mysql-Reader拿到canal返回的Message之后,可以设置其进行dump Records的详细信息。由于数据量比较大,默认不进行dunp。
-
【消费起始时间】
> Mysql-Reader可以配置消费Binlog的起始时间戳,当Task第一次启动时,若指定了起始时间戳,则从基于指定时间戳最近的时间开始寻找消费位点;若不指定起始时间戳,则默认从源端数据库的当前最新时间开始寻找消费位点。
> Task同步过程中,可以重置消费位点,即将消费时间改为过去的某个时间,则Mysql-Reader将从新的时间戳开始寻找消费位点,实现当在数据出现异常时的追数据、补数据的功能。
-
【支持ddl同步】
> Mysql-Reader可以设置启用自动同步ddl类型的Events,将其转换为RdbEventRecord对象,主要用于往关系型数据库的同步。系统默认开启该功能。
-
【其他类型数据源支持】
> 基于canal的设计机制,Mysql-Reader源端支持的数据源类型除了Mysql,还支持MariaDB 5.5.35和10.0.7(理论上可支持以下版本),以及部分oracle版本。
-
【MysqlReaderParameter】
在继承Reader插件通用参数基类(PluginReaderParameter,详见深入Task)的基础上,MysqlReaderParameter还根据canal的特点扩展了自己的参数类,用户可以根据需求在页面更改其参数配置。MysqlReaderParameter扩展参数 参数描述 默认值 备注 dumpDetail 是否需要dump records详情
false 可以设置dump records详情 startTimeStamps 起始时间戳 源端数据库当前最新时间 Task第一次启动时,通过该时间戳寻找位点 fallbackIntervalInSeconds 数据库发生切换查找时回退的时间 60(单位:s) 当源端数据库出现问题发生DB切换时,默认canal回退60s查找位点 batchTimeout
获取批量数据的超时时间
-1L(单位:ms)
-1代表不进行超时控制,0代表永久,>0则表示按照指定的时间进行控制 messageBatchSize
Message订阅批次大小
2000
参数调大了可以增大系统的吞吐率,调小了增加同步的实时性 memoryStorageBufferSize 缓存记录数 32 * 1024(单位:byte) 内存存储的buffer大小 memoryStorageBufferMemUnit 缓存记录单元大小 1024(单位:byte) 内存存储的buffer单元大小 detectingSQL 心跳检测sql select 1 Reader插件与源端数据库之间有心跳检测线程 detectingIntervalInSeconds 心跳检测频率 3(单位:s) detectingTimeoutThresholdInSeconds 心跳超时时间 30(单位:s) detectingRetryTimes 心跳检测失败重试次数 3 defaultConnectionTimeoutInSeconds 连接超时时间(sotimeout) 30(单位:s) receiveBufferSize mysql连接接收到的BufferSize 64 * 1024(单位:byte) sendBufferSize mysql连接发送的BufferSize 64 * 1024(单位:byte) GroupSinkMode 分组同步模式下的Event-Sink模式 Coordinate Coordinate,//所有eventparser必须相互协同,保证event在时间序列上全局有序,保证所有分库必须都同时正常才进行数据同步(生产环境需用该模式,保证数据全局有序,把所有分库看成一个整体)
Separate;//eventparser各自进行数据同步,不需要相互协同,event只是局部有序,一个分库出现问题不会影响其它分库数据同步(测试和预生产推荐使用该模式,因为测试和预生产数据量比较小,很容易出现某个分库长时间没数据的情况,这种情况下会出现协调等待,其它分库的数据也无法同步了)
filteredEventTypes 需要过滤的某些特定类型的事件 无 INSERT,UPDATE,DELETE(可多选) blackFilter 正则表达式匹配表黑名单,忽略解析 .*\\._.* 默认过滤所有以"_"开头的表 -
【RdbEventRecord】
Mysql-Reader插件将每条Mysql的变更数据抽象为RdbEventRecord。其主要参数如下:RdbEventRecord参数 参数描述 备注 tableName
数据表名
schemaName
数据库(实例)名称
EventType
变更数据的业务类型(I/U/D/C/A/E)
executeTime
发生数据变更的业务时间
oldKeys(List<EventColumn>)
变更前的主键值
和oldColumns不同的是,只有主键发生变化时,才需要给oldKeys设置值;
而oldColumns,不管前后是否发生更新变化,都会赋值
keys(List<EventColumn>)
变更后的主键值
如果是insert/delete,变更前和变更后的主键值是一样的
oldColumns(List<EventColumn>)
变更前非主键的其他字段
columns(List<EventColumn>)
变更后非主键的其他字段
sql
对应的sql语句
当eventType = CREATE/ALTER/ERASE时,就是对应的sql语句,其他情况为动态生成的INSERT/UPDATE/DELETE sql
ddlSchemaName
ddl/query的schemaName
会存在跨库ddl,需要保留执行ddl的当前schemaName
hint
生成对应的hint内容
RSI Record资源标识符
-
【Mapping相关参数】
一个Mapping代表了一个同步方向,即要将哪个库的哪张表的哪些字段和数据同步到哪个库的哪张表中。映射关系为一对多,即一张表可以同步到多个目标端数据源。Mysql-Reader插件的相关Mapping配置参数如下:Mapping参数 参数描述 备注 taskId
所属Task的id
每个映射均属于一个Task sourceMediaId
源端表的id
源表名称的模式有:
SINGLE,//正常单表名称
MULTI,//支持类似"offer[0000-0031]"的分库分表模式
WILDCARD,//支持全库同步的通配符
YEARLY,//支持按年分表的表名模式
MONTHLY;//支持按月分表的表名模式
关联技术 | 稳定版本 | 待测版本 |
---|---|---|
canal | 1.0.24 | |
mysql | 5.7及以下 |
-
【变更表结构的一些限制】
需要明确一点:
* Binlog对每条日志事件,并没有记录列名,只是按顺序记录了每个列的值
* com.alibaba.otter.canal.protocol.CanalEntry中的列名、列类型等信息,是canal拿到binlog数据,通过反查元数据表构造出来的
所以,参与数据同步的表不能进行如下类型的表结构变更,否则Binlog回溯时可能会出现错误变更类型 描述 Table-Rename 表重命名之后,如果进行binlog回溯,重命名之前binlog中保存的表名在数据库当前元数据中查不到
canal会抛异常Column-Rename 列重命名没有binlog回溯的问题
Column-Rename不支持,是因为脚本检测的时候,涉及到黑白名单的检查,实现起来比较麻烦,暂未实现Column-Drop 列删除之后,如果进行binlog回溯,删除前binlog中列的数量和数据库当前元数据中列的数量不一致,
canal会抛异常,即使采取不抛错的策略,构造出来的【列名和列值映射】也是错误的Add-After-Column 增加列的时候,只能增加到末尾
否则回溯binlog的时候,Add操作之前binlog中列的顺序和数据库当前元数据中列的顺序不一致,
构造出来的【列名和列值映射】是错误的Modify-After-Column 原因同Add-After-Column ps:binlog回溯的相关问题可参考canal源码,com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert
-
【canal说明】
Mysql-Reader插件依赖于canal提供数据库日志,针对mysql数据有一些要求,具体请查看: https://github.com/alibaba/canal/wiki/QuickStart
注意:目前canal支持mixed,row,statement多种日志协议的解析,但配合DataLink进行数据库同步,目前仅支持row协议的同步,使用时需要注意。