分布式dataX CDC与关系/图(neo4j)增量同步(完整版)

1. 背景

数据增量同步是ETL关键功能,在全量同步后,持续增量同步,保证数据的完整,正确和时效,通常有两种方式实现,双写和CDC

双写 优点,实现简单, 写入源库同时写入目标库;缺点,代码侵入,影响正常业务

CDC 优点,无侵入,读取数据库log,获取数据变更;缺点,复杂,需要引入CDC组件,从数据变更(表/行/字段变更)到目标增量变更(通常是DTO)需要复杂的映射

Cdc组件本身通用设计,支持扩展redis,elasticsearch等数据库同步

本文包括两部分,cdc同步框架和基于cdc同步框架的关系/图增量同步设计

2. 参考和术语

CDC change data capture 数据变更抓获

RBT 基于规则的转换组件

《分布式datax架构设计》

《分布式datax详细(落地)设计》

《分布式时间槽设计》

3. SETL介绍

下图介绍SETL逻辑架构和规划

setl-rbt 全量同步组件,datax组件,接入分布式调度,实现高性能的全量同步

setl-cdc cdc增量同步datax组件,接入分布式时间槽实现高可靠增量,后续规划接入kafka connect

setl-stream 规划中,流式etl,引入kafka connect,实现高吞吐低延时的增量同步

config-center 配置中心,datax原生使用本地文件配置,配置中心摆脱本地文件限制,实现分布式系统的必要基础设施

rb-transformer 基于规则引擎的转换器组件,针对datax record

selt-data 相当于spring data,数据读写

4. Debezium CDC原理

Debezium cdc组件,支持多种关系数据库,如sqlserver,db2, oracle,mysql等,也支持newsql/nosql, 如Cassandra,mangodb,抽取并解释数据库日志获取数据变更,支持以事件/监听模式消费事件,debezium支持ack机制,实现事件消费的可靠性

下图是dbz主要模块

Api dbz引擎定义,目前只有一个实现,EmbeddedEngine

Embedded dbz引擎的实现,顾名思义,可以嵌入到用户的代码,轻量级的,称为api方式

Server dbz预置的事件分发服务,开箱即用,实现分发变更事件到redis,plusar和其他云消息服务,称为dbz engine server方式,但其依赖Quarkus,一个对标spring cloud框架,如果你已使用spring cloud构建你的服务,可能对该方式不是很感兴趣

connector 官方推荐的deploy方式,如下图,接入kafka connect,connector作为kafka connect的source,捕获源数据库变更,以事件方式推动到kafka,订阅事件的sink,写入到目标库,该方式充分利用kafka特性

前2种方式没有接入kafka,使用的是相同的connector,但也依赖kafka类,如事件,消费接口等

5. setl data cdc开发框架

上图是setl cdc sdk,虚线框是dbz的类,其他属于estl-data-cdc组件

DbzEngineClientConfiguration 构建和初始化dbz引擎客户端,支持读取本地配置文件,配置中心

DebeziumEngineClient dbz引擎客户端,负责启动/停止dbz引擎

DispatchChangeEventConsumerImpl cdc事件消费实现,接收原生事件,负责批量ack

DispatchChangeEventListenerImpl/ChangEventListener setl-cdc组件

1) 接收原生变更事件,读取和解释到RowDataBean,使用内部模型,如,操作类型,数据的before和after,解耦对dbz的依赖,也更方便后续使用

2) 分类变更事件,目前只有行数据(row)事件, 过滤掉query操作,分发给ChangeEventRowListener

BinlogSync ChangeEventRowListener实现,实现订阅服务,依据ChangeEventHandler设置感兴趣的db.table和增删改类型调用处理器

开发人员实现自己的同步业务只需实现ChangeEventHandler即可,sdk隔离dbz cdc的内部机制

6. 关系/图(neo4j)增量同步设计

增量同步是cdc开发框架应用,cdc告诉我们哪个库哪个表哪行哪些字段变更,但增量同步的目标库,如,neo4j,elasticsearch存储相当于dto,因此需要映射计算框架,从表行字段的变更映射为变更的dto

Ø 配置和执行

上图是CDC模型,包括CDC定义,映射Action设计,属于rb-transformer模块

CDC cdc配置模型,同时负责选择和执行适用cdc动作

CdcAction cdc动作,数据变更映射到目标库变更的逻辑实现

CdcRule cdc动作是否适用数据变更,CdcRule返回布尔值

CDCRunningContext cdc运行上下文

Ø datax reader

Cdc动作和变更事件处理器datax实现,该实现接入到datax reader,使用datax的reader->transform->writer机制,cdc发出数据变更事件,cdc动作负责映射为目标变更,transform规则转换,最后到writer,目标变更写入目标库

Ø 同步实现时序

同步实现时序展示数据变更事件分发,CDCAction执行

关键步骤:

1.2/1.21 ChangeConsumer负责批量ack

1.3/1.4 解释变更事件,并转化为内部RowDataBean对象,解耦dbz

1.5 BinlogSync是ChangeEventRowListener实现,从这开始进入同步业务

1.6/1.7 查找对表变更感兴趣的cdc,表粒度较大,初步筛选,1.12 精确筛选

1.12/1.16 onConditional通过规则返回action是否适用(RowDataBean),若适用执行

1.15 CdcReaderAction 依据RowDataBean,源数据变更映射为目标变更,同步到目标库

Ø 关系/图CDC配置示例

1) 场景

上图cdc示例schema,覆盖3类场景

1. 连接表关系变更 film->film_category->category

2. 外键关系变更 customer->store

3. 主从表多对一变更 store-address对应图库的store顶点

2) 配置示例解释

Cdc挂在rbt转换下,两者并无直接关系,这样设计主要方便cdc编写时参考

Tables标签 该cdc相关的表,可设置多个表

Insert/update/delete标签 分别对应增删改分类的cdc动作,实际上,insert的事件可以产生update的action,该分类只是管理维度

Action标签 数据变更对应的动作,映射源变更为目标变更

上图展示源customer表insert引起目标图库两个动作

1) 新增的customer顶点

2) 如果新增customer带有store外键,新增customer->store关系,适用规则

r.afterField(store_id)!=null,插入数据 store_id不为空

上图源customer变更引起目标变更,

规则 !(r.updates().size()==2 &&r.isUpdate('store_id'))*

判断数据变更除了store_id还有其他字段变更才出发,store_id变更,引起后面两个动作,删除旧关系,新增新关系,关系rule是一样的,参数来源不一样,删除的store_id来源于before,新增来源于after

*数据行lastUpdate必然变更,所以r.updates().size()==2

上图删除变更映射,customer删除,首先删除关系,再删除节点

Ø 退出策略(TBD)

Cdc是事件流,事件源源不断产生,认为是永续执行,目前cdc接入datax,定时调度执行,因此需要一个机制退出,待下次定时调度再执行,例如,执行时长,未接收到事件时长等

7. 数据可靠性

可靠channel

可靠channel,可确认的分布持久的channel,Channel不可靠对于CDC是致命的,丢失数据;但对于全量同步可以接受,全量同步故障转移后,整个分片重新同步即可。可靠channel对于数据量比较大,没有分片的情况也非常有价值,相当于断点续传的能力,但对性能有一定影响

Ø CDC原生Datax channel分析

整个数据链路包括2部分,

第一段,CDC变更事件推送到reader,reader写到Exchanger(Channel)*成功后ack CDC

第二段,writer从Exchanger拉取数据变更,同步到目标存储

另外,Channel 承担流量统计和流控的职责

可以看到,第二段是不可靠的,MemoryChannel底层使用内存ArrayBlockingQueue存放数据,datax节点崩溃,故障转移后,原节点Channel的数据将丢失

*Buffered类型Exchanger缓存Record,批量提交,存在丢失可能,reader需要非buffered Exchange配合,writer可以适用buffered或非buffered

Ø 可靠channel设计原理和实现

1) 实现方案-推模式

数据链路同样的两个阶段,不同的是第二阶段,channel引入mq作为持久存储,提供可确认,方案改变原数据链路,数据从mq获取,writer依赖mq,从而也改变了writer开发模型,6.1/6.2只是激活pull统计,获取的数据并不使用。6.1/6.2放在5~7之间,是为了pull统计更准确

2) 实现方案-拉模式

同方案1,引入mq,不同的是,mq作为本地queue持久存储,Channel封装起来,writer不需要依赖mq,数据链路与原生一样,主动获取mq消息。本方案保持数据链路形态,即writer通过RecordReceiver获取Record。缺点,Exchanger/Channel增加ack方法,主动消费,涉及消费异步ack问题

Ø 推模式下channel统计

推模式下,旁路读取record,读取record通过消息引擎,需要通知channel读取了record,channel计算record的大小,发起统计

RecordReceiver接口增加byPassReader方法

public void byPassReader(Record record);

7 分布式dataX CDC

分布式dataX CDC有两种可选方式,分布式作业分布式时间槽

分布式作业在《分布式dataX详细(落地)设计》介绍过,dataX CDC单分片,使用分布式作业,只有一个worker作业工作,其他worker作业备用状态,资源利用率不高,因此,分布式时间槽比较合适

Ø 技术架构

下面介绍分布式dataX CDC的技术架构,下图是分布式datax和分布式datax CDC技术架构对比, 前者使用分布式作业,后者分布式时间槽,通过对比更好了解分布式dataX CDC

上为分布式dataX CDC, 使用分布式时间槽模式;下为分布式dataX,使用分布式作业模式

1. 作业节点

dataX CDC:分布式作业分片对应dataX作业,是standalone模式的datax engine

dataX:作业(worker)节点分片是dataX作业分片,是任务组模式的datax engine

2. Client

dataX CDC:管理台api写入分片,没有专用client

dataX:专用的client,作业模式下Datax Engine,使用分布式调度器,负责分片和分配分片

3. 配置中心

dataX CDC:配置放入配置中心,避免每个节点存放,影响动态伸缩和维护,当然也支持本地配置文件,每个worker节点配置所有的dataX CDC作业,用于failover

dataX:作业配置直接写入分片的config节点,其他配置也可使用配置中心

4. 作业/任务租统计

dataX CDC:任务组的统计也是作业统计,因此不需要聚合为作业统计,直接拉取复制到prefix,latest/prefix按设定的时间段计算速率;cdc没有最终summary,持久库保存多个summary,设定保存时限自动删除

dataX:设计比较复杂,参考《分布式datax详细(落地)设计》

5. 分片策略

dataX CDC:eager模式,一次分配完分片,尽可能早地执行所有分片,达成用户触发时间要求

dataX:on demand模式,按需分配工作量,获得更小的总体执行时间

6. 故障转移

两者故障转移机制一致,节点下线,其他节点接替分片,不同的有两点,

6.1 CDC处理事件流,事件流经channel,需要可靠channel,节点切换后继续执行未ack的channel内的事件,作业整个分片重新处理;

6.2 cdc持续运行,需要保证一定的空闲节点数,需监控告警空闲节点数

Ø 动态分片新增和撤销

分布式时间槽支持cancel分片,设置cancel标志,待下次重启删除cancel分片,但cdc一直运行,该功能不能生效,因此目前未有动态分片新增和撤销

Ø znode结构

setl_cdc 根节点,可以看成一个用户域,用户定义

cdc_watcher 观测节点,一个域只需一个观测者,其分片与域内所有作业的所有分片一一对应,用户定义

cdc1001 逻辑作业,其分片是datax CDC作业,job模式的datax Engine,这里有两个作业概念,分布式作业dataX作业,域内可有多个作业,每个不同作业类型,上图两个分片对应两个dataX CDC作业,分片名称 userId+作业名称;

Ø 作业/分片的配置

观测分片配置是任务组统计的reids key前缀

dataX CDC作业配置,dataX作业jobId,作业配置,下图配置是本地文件,若使用配置中心,url 以 ”cc://” 开始

8 架构质量设计

Ø 可靠性 数据变更事件不丢失,数据变更事件至少消费一次,但允许重复处理

依赖可靠channel,只有事件处理完毕ack,源头事件偏移更新,否则,事件重发,事件处理器需保证幂等性

Ø 高可用 变更事件处理是按顺序,只能单个线程按顺序处理,高可用是主备架构,处理节点失效,备用节点激活无缝接上失效节点

这里涉及两个关键点,处理状态恢复, 包括debezium位点存储文件和数据库schema文件;处理节点失效发现,备用节点激活

Ø datax writer 数据变更事件允许重复处理,writer需要考虑幂等性

附录

Dbz引擎配置示例

几个重要的配置

connector.class 数据源,mysql,oracle等

offset.storage.file.filename 偏移存储文件位置,通常多个CDC作业,需启动多个debezium引擎,文件路径要区分,同样,databases.history.file.filaname 数据库schema文件位置

database.server.name 每个引擎配置不同的名称,否则出现异常:

javax.management.InstanceAlreadyExistsException: debezium.mysql:type=connector-metrics,context=schema-history,server=localhost

展开阅读全文

页面更新:2024-04-02

标签:分布式   增量   关系   作业   节点   完整版   组件   目标   模式   事件   数据

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top