Skip to content

架构设计

myksl edited this page Apr 27, 2022 · 12 revisions

1.整体架构与部署


上图为简单的部署架构图,下面简单解释一下该图:

  1. 一个Reader集群有多个Reader服务实例组成,一般为跨机房部署,一个Reader集群中所有的Reader服务实例完全一致,多个实例的目的主要为容灾和压力负载均衡。
  2. 一个Reader任务只拉取一个MySQL集群实例的binlog,所以Reader任务与MySQL集群的对应关系是1对1的关系。Reader任务默认是拉取从库的binlog。关于Reader服务拉取binlog是否会对从库或者主库有影响,DBA官方给出的结论是微乎及微的影响
  3. 一个Reader服务实例实例可以有多个Reader任务,所以Reader集群与MySQL集群的对应关系是1对N的关系。
  4. 一个SDK集群有多个Writer服务实例组成,一般也为跨机房部署,每个SDK任务的分片数会平均分配到集群中每个实例上。

2.模块说明

2.1.Reader模块

  1. 实现了MySQL Replication协议,伪装为一个MySQL Slaver,拉取MySQL的binlog。
  2. 管理表结构(下面称为:Schema),根据Schema结构,将binlog解析并缓存起来(支持磁盘和内存两种缓存模式)。
  3. 管理与SDK的链接,并根据SDK的订阅位点,将指定位点的binlog发送给SDK(默认使用ProtoBuf编码发送binlog数据)。
  4. 一个Reader服务可以容纳多个Reader任务,一个Reader任务关联一个数据库集群。


2.1.1.存储层实现

Ptubes存储层分为两部分, 内存存储和文件存储 内存 使用堆外内存存储数据,可以配置不同任务使用的大小;使用堆内内存存储索引。

缓存由Index和EventBuffer组成,index和eventBuffer都是基于PtubesEventBuffer实现,支持高效读写;
Index存储的是EventBuffer中每个block的索引,数据写入EventBuffer后,会更新Index,寻址的时候,会先在Index中定位数据所在的block(为提升找点效率,使用二分法进行找点),然后遍历EventBuffer中对应block的所有数据,根据SDK请求的存储点以及分片信息返回数据给SDK
由于Ptubes支持时间、binlog位置以及gtid寻址,因此Index结构如下:

文件存储
文件存储结构

文件树形图如下

存储数据结构
BinlogInfo:用于唯一标记一个event的binlog信息,index文件中的key值

字段名 length(byte) 含义
changeId 2 初始值为0,每发生一次切库,changeId+1,用于保证切库之后,索引还是递增的
serverId 4 从库的serverId
binlogFile 4 从库binlog文件编号
binlogOffset 8 在binlog文件内的偏移量
uuid 16 当前事务的gtid的uuid
transactionId 8 当前事务的gtid的transactionId
eventIndex 8 当前event在事务中的index
timestamp 8 binlog时间戳

CheckPoint:SDK中用于唯一标记一个event的信息

字段名 length(byte) 含义
serverId 4 从库的serverId
binlogFile 4 从库binlog文件编号
binlogOffset 8 在binlog文件内的偏移量
uuid 16 当前事务的gtid的uuid
transactionId 8 当前事务的gtid的transactionId
eventIndex 8 当前event在事务中的index
timestamp 8 binlog时间戳

DataPosition结构:用于标识index或者data所在的文件以及位置

字段名 length(bit) 含义
date 18 binlog日期
bucketNumber 14
offset 32 data或index文件offset

MaxBinlogInfo文件:存储层用于存储binlog拉取的位置,重连或者服务重启的时候,从该位置继续拉取binlog,定时刷新

//包含BinlogInfo所有字段
serverId=slaveServerId, 
binlogFile=2, //初始值必须是准确的
binlogOffset=774007788,//初始值可以设置为4
binlogTime=2018-12-25 17:51:13,
refreshTime=2018-12-25 17:51:13,
gtid=ec4dafec-8a5b-11e8-81dc-00221358a93e:949976,  
gtidSet=ec4dafec-8a5b-11e8-81dc-00221358a93e:1-963237

ChangeIdInfo文件:存储changeId与对应从库的binlog起始位置信息,服务启动的时候检查serverId是否与上一次相同,如果不同,则追加一条changeId,并保存,否则不做变更

changeId=XX,serverId=XXX,gtidSet=XXXX,beginBinlogFile=XXX,beginBinlogOffset=XXX
changeId=XX,serverId=XXX,gtidSet=XXXX,beginBinlogFile=XXX,beginBinlogOffset=XXX
changeId=XX,serverId=XXX,gtidSet=XXXX,beginBinlogFile=XXX,beginBinlogOffset=XXX

文件以小时为单位按时间分路径存储,每个Data bucket 1G,L2Index bucket和Data bucket是一对多关系

L1Index文件结构

L2Index文件结构

Data文件结构

L1Index文件命名:l1Index.index

L2Index文件命名:bucket-序号.index

data文件命名:bucket-序号.data

读写流程

写流程

EventProducer线程处理event存储的时候,同时更新缓存和文件

写文件使用DataOutputStream,将数据写入缓存区,异步flush落盘 EventProducer线程:负责更新Buffer、文件以及Index
Flush线程:负责定期Flush Data文件和Index文件
CleanUp线程:负责定期清理Data文件和Index文件
Flush条件:

  1. 新建bucket
  2. 写完一个block
  3. 定时flush
  4. 程序退出时flush

读流程

根据SDK请求时候带的binlogInfo进行寻址

  1. 根据内存buffer binlogInfo范围判断数据是否在内存中,如果在内存中,跳转到步骤2,如果查找的binlogInfo小于内存的最小binlogInfo,跳转到步骤4,如果大于内存最大的binlogInfo,返回结果
  2. 查找内存index确定block offset
  3. 遍历data buffer对应的block查找需要的数据,根据分片和存储点进行过滤并返回
  4. 如果查找的binlogInfo小于Index的最小binlogInfo,返回结果
  5. 二分法查找Index,获取data文件名和offset
  6. 从data文件offset处开始遍历数据,返回SDK订阅表对应分片的变更

Data寻址
L1Index会映射到内存,根据binlogInfo参数中的有效值检索到L2Index文件,然后再从对应的L2Index文件中寻找到正确的DataPosition
寻址方式开关
寻址方式通过开关控制,提供GTID_OFFSET,BINLOG_OFFSET,GTID三种种方式

  1. GTID_OFFSET:binlog_order_commit=ON前提下,binlog中gtid是严格递增的,可以根据GTID_SET生成递增的gitdOffset(long值)进行索引
  2. BINLOG_OFFSET:根据binlogNumber和offset生成递增的binlogOffset进行索引
  3. GTID:binlog_order_commit=OFF的时候,binlog中gtid并不是严格有序的

不同场景下寻址依据
当寻址模式为BINLOG_OFFSET时,Mysql开启GTID以及未开启GTID情况下,不同的场景Index文件检索依据如下

寻址判断依据
SDK第一次请求 timestamp
SDK回溯 timestamp
切库 changeId+serverId+binlogFile +binlogOffset
正常情况 changeId+serverId+binlogFile +binlogOffset

寻址比较算法
BinlogInfo比较算法

if 只有时间信息有效
  比较时间
  
if changeId相同
  比较binlogFile和binlogOffset
else
  比较changeId

由于是稀疏索引,从Index文件中找到dataPosition后,需要进行遍历直至找到准确的事务,由于事务中每个event的binlogInfo相同,遍历事务的时候,不需要遍历所有的event,只需解析事务的第一个event中的binlogInfo信息,即可判断是否找到

GTID寻址比较算法
根据GTID寻址不需要使用index文件

  1. 按时间倒序遍历Data文件,读取Data文件开头的Previous_Gtids,判断请求的gtid是否在Previous_Gtids中,
  2. 如果Previous_Gtids包含SDK请求的gtid,遍历上一个Data文件,跳转到步骤1
  3. 如果Previous_Gtids不包含SDK请求的gtid,遍历当前Data文件,解析每个事务的第一个event中的gtid,直到找到为止,找到后,从下一个事务开始读取数据返回给SDK

Binlog压缩
压缩
Ptubes会对一定时间之前的数据做压缩处理, 压缩算法为GZIP, 通过压缩来降低对磁盘的占用
数据清理
缓存由于是使用的是PtubesEventBuffer,数据通过LRU淘汰,而且缓存数据多于磁盘数据,不影响正常的读写,因此无需专门的清理任务做清理
磁盘清理方式:
1.定时任务清理
采用定期清理的方式进行数据清理,可配置清理时间和周期,磁盘数据默认仅保留最近24小时的数据,清理分两个流程清理

  1. force delete不可读文件
  2. mv当前过期的可读文件,可读文件变不可读
    清理的数据包括data数据以及Index数据,Data数据和L2Index数据由于是按小时存储的,可以直接mv整个路径,L1Index数据是都在一个文件中,通常最多不会超过一万条数据,所以在更新L1Index的时候进行L1Index的过期数据清理

2.2.Ptubes-Sdk模块

Ptubes-sdk是Ptubes工程的客户端。通过与reader之间进行http通信获取最新的数据变更(参考“3.Ptubes-sdk与Ptubes-reader”)。Ptubes-sdk利用Apache Helix实现了多分片之间的负载均衡(参考 “2.Ptubes-sdk与Apache Helix”)。由于Apache Helix依赖zookeeper,因此Ptubes-sdk也直接依赖zookeeper。

2.2.1线程模型

Ptubes-sdk 多机情况下示例图
针对一个同步任务 taskA,运行在两个不同的进程中bizServerProcessA 和bizServerProcessB。其线程结构如下
提示:

  1. 队列中每个“信封”代表一批数据;回调用户时也是按批回调,每批的大小可以调整 [1,n) ,当批次大小为1时相当于依次单条串行回调。
  2. 同一个Ptubes-sdk任务可以运行在多个java进程中。同时,同一个Ptubes-sdk java进程中可以运行多个不同的Ptubes-sdk任务。由于高可用以及一致性等角度的考量,你不能在同一个线程内启动同一个任务两次。
  3. 同一个Ptubes-sdk任务订阅的具体Ptubes-reader可能是不同的,具体依赖“高可用线程”进行动态的reader IP的选取。
  4. Ptubes-sdk的分片。是为了提高数据同步吞吐而诞生的一种并行化手段,在启动任务时,你可以为每个表指定一个分片字段,Ptubes-sdk会根据分片字段的值按照一定的哈希规则(根据分片字段取余hash),将同一个reader任务的数据散列到不同的子数据流中,每一个子数据流对应一个worker线程。显而易见,使用分片可以提高同步效率,同一个子数据流内的数据是顺序发送给下游的。不同子数据流内的数据并行发给下游,且它们之间没有顺序相关的保证。
  5. 由于Ptubes-sdk的各个子数据流(各个分片对应的数据流)之间的同步是并行的,因此我们为每一个分片保存了一个“存储点”信息。分片的“存储点”信息保存了当前分片对数据流消费的位点情况,随着数据流的推移,“存储点”也会逐步推进。在故障恢复、进程间分片迁移等场景我们可以利用“存储点”还原恢复不同分片的消费场景,并根据指定“存储点”重新订阅reader,保障了数据流的一致性。
  6. “存储点”是异步刷新到持久化存储介质上的,具体可以参考“2.Ptubes-sdk与Helix”中关于存储点的介绍。

2.2.2.Ptubes-sdk与Apache Helix

Ptubes-sdk依赖Helix进行分片调度,因此也是强依赖zk的。每个Ptubes-sdk任务对应helix的一个“集群”,同时我们会为每个任务创建一个名为“default-resource”的默认“Helix资源”,每一个Ptubes-sdk的分片对应Helix“default-resource”资源的一个partition。helix通过对“default-resource”资源的负载均衡调度,间接完成了对Ptubes-sdk任务分片负载均衡的调度。

关于Helix的具体概念可以在Helix获取介绍。

2.2.3.Ptubes-sdk的存储点

正如Helix依赖zookeeper进行分片调度一样,Ptubes-sdk同样将存储点信息保存在zookeeper上,并使用了Helix提供的保存用户自定义数据功能https://helix.apache.org/1.0.2-docs/tutorial_propstore.html
对于一个拥有3个分片的Ptubes-sdk任务taskA而言。其存储点在zookeeper上的目录结构为

Znode Path      tips      Znode Value Demo
/taskA/PROPERTYSTORE/partition-storage 用来在分片迁入迁出、调整分片数量、修改“存储点”等场景,临时保存新的分片数量和存储点信息等。 "{
  "id":"partition-storage"
  ,"simpleFields":{
    "partition-storage":"<br{"partitionNum":11,"buffaloCheckpoint":{"uuid":"00000000-0000-0000-0000-000000000000","transactionId":-1,"eventIndex":-1,"serverId":-1,"binlogFile":-1,"binlogOffset":-1,"timestamp":-1,"versionTs":1622204269854,"checkpointMode":"LATEST"}}"
  }
  ,"listFields":{
  }
  ,"mapFields":{
  }
}"
"/taskA/PROPERTYSTORE/0_taskA
/taskA/PROPERTYSTORE/1_taskA
/taskA/PROPERTYSTORE/2_taskA"
保存某个分片的“存储点”情况 "{
  "id":"-1"
  ,"simpleFields":{
    "checkpoint":"{"uuid":"c055a63d-99c7-11ea-a39c-0a580a24b346","transactionId":54839254,"eventIndex":0,"serverId":36179070,"binlogFile":68,"binlogOffset":130958732,"timestamp":1624612320000,"versionTs":1622204269854,"checkpointMode":"NORMAL"}"
  }
  ,"listFields":{
  }
  ,"mapFields":{

  }}"

存储点的结构为

名称 类型 功能 示例
uuid 字符串 事务 gtid 的uuid c055a63d-99c7-11ea-a39c-0a580a24b346
transactionId 64位整型 事务 gtid 的事务id 54839254
eventIndex 64位整型 事务中行号 0
serverId 32位整型 Server ID 36179070
binlogFile 32位整型 binlog文件号 68
binlogOffset 64位整型 binlog文件偏移 130958732
timestamp 64位整型 事务时间戳 1624612320000
versionTs 32位整型 存储点版本信息 1622204269854
checkpointMode 字符串 "枚举类型:存储点类型
EARLIEST
LATEST
NORMAL"
NORMAL

2.2.4.Ptubes-sdk与Ptubes-reader

sdk与reader之间通过3个http接口完成整个数据订阅和消费流程。

URI 功能 备注
/v1/getServerInfo 获取reader信息 获取某reader信息
/v1/subscribe 订阅 对某个readerTask进行订阅,需要在“获取reader信息”后调用
/v1/fetchMessages 拉取数据 拉取一批数据,需要在“订阅”成功后进行调用

具体的状态转换如图

3.实现细节

3.1.数据一致性保证

关于数据一致性保证,Ptubes系统承诺保证数据的最终一致。关于数据一致性保证,我们主要通过两个方面保证:

  1. binlog解析存储阶段保障 Writer服务会将Writer任务每个分片的消费持久化起来(每间隔两分钟持久化一次),所以无论是实例重启还是分片转移,都可以保证按照分片的当前位点重新订阅数据并拉取消费。
  2. binlog消费阶段保障 (1)定时记录消费位点,宕机/重启后从上一次记录的消费位点开始继续消费;

3.2.并行化消费


  1. 分片纬度并行:根据分片字段hash分配分片,每个分片独立消费位点,分片之间并行消费;
  2. 任务纬度并行:不同的任务独立消费位点,任务并行消费;
  3. 机器纬度并行:支持水平扩容;

3.3.保证有序性

Ptubes中跟顺序有关的名词如下:

  • 分片数:Ptubes往下游写数据的并发数,Ptubes保证分片内的数据有序发送。
  • 分片字段:每个表可以指定表中的一个字段作为分片字段,分片字段值经过Ptubes内部hash算法后对分片数取模可计算得到处理该条数据的分片号。 拿MQ接入方式举例,假设分片字段是id,Ptubes分片数是3,Mafka分片数是5,则不同id的数据在Ptubes和MQ中的分片分布情况如下图:
    同一行数据,只要分片字段值没有变更,可以保证被hash到同一个分片串型处理,保证行级别的有序。
    1.分片字段的选择
    针对分片字段的选择,有以下几点建议:
  • 建议选择主键或者唯一键做分片字段,可以保证数据尽量均匀打散到各个分片中,避免某一个处理的数据过多造成热点导致延迟。
  • 建议选择不会变更的字段作为分片字段,分片号的计算是根据这个字段当前的值计算得到的,如果分片字段会发生改变,可能导致同一行数据的两次变更计算得到的分片号不是同一个,影响顺序。 2.分片字段的修改 服务接入后,除了因顺序问题影响下游服务处理数据之外,原则上不允许修改分片字段。