去评论
dz插件网

腾讯云数据库DTS发布全新数据集成方案:全增量无缝同步,快速构建实时数仓

迪巴拉
2023/08/02 23:40:35
随着IT技术与大数据的不断发展,越来越多的企业开始意识到数据的价值,通过大数据分析,可以帮助企业更深入地了解用户需求、更好地洞察市场趋势。目前大数据分析在每个业务运营中都发挥着重要作用,成为企业提升市场竞争力的关键举措之一。通常企业会构建数据湖仓,将多个数据源通过数据集成技术,汇集一起进行数据分析。由此,数据集成成为了构建数据湖仓的必经之路,然而企业在数据集成过程中却面临很多棘手问题。


传统的数据集成大多仅支持全量数据,对于全量+增量的一并集成,则需要分别部署链路,获取到数据后再手动合并。

表结构频繁变更,无法自动同步表结构变更到数据湖仓,手动维护成本高。另外无法”一键”整库同步,追加同步对象操作复杂等。

传统的数据集成技术建模路径较长,按照T+1的方式同步到数据仓库中,时效性差。需要做到实时数据集成和分析,才能帮助用户根据最新的数据做出更快、更准确的决策。



基于数据集成的核心痛点和用户诉求,近期腾讯云数据传输服务DTS重磅发布全新数据集成方案,该方案采取全增量数据一起的同步方式,将数据源先同步到Ckafka,再从Ckafka消费数据投递到数据湖仓,可以有效帮助用户解决数据湖仓建设前期数据集成的问题。



关于DTS

选择DTS做数据集成是因为DTS有着技术上的天然优势。

2.1 DTS简介


DTS是腾讯云自主研发的专注于数据库传输服务的工具,具有高传输性能、高可用、安全连接、操作便捷等特点,可以实现数据源在业务不停服状态下的实时数据同步,整个数据同步过程对源库业务无影响。DTS已成功应用于金融、医疗、娱乐、泛互联网等多个行业场景,帮助用户实现不同系统的数据打通和自由流动,如数据库迁移上云、数据库异地备份、异地多活等。



2.2 DTS的技术优势


首先,DTS本身已支持多种数据源的同步,涵盖MySQL、MariaDB、Percona、TDSQL-C MySQL版、TDSQL MySQL版、PostgreSQL、Redis、MongoDB、SQL Server等,对各种类型的数据库以及对应的数据格式都“了如指掌”,可以保证数据同步结果的正确性。

在DTS已有的技术积累中,已支持了无锁同步技术,即在同步过程中,不会对源库加全局只读锁(FTWRL),避免影响源库的写入。在数据同步过程中,源库若发生主从切换、重启等,任务都可以正常运行,不会被中断。这些技术都对源库非常友好,保证使用DTS同步数据的同时,不影响源库业务的正常运行。

其次,提供全增量一体的数据集成能力是当前业界的主流发展方向,而DTS本身就具备此能力,DTS在数据库之间的同步机制,原生就采用全增量无缝衔接的同步机制,既能保证数据一致性,又能保证数据的实时性。这个能力可以避免因使用不同工具分别集成全量或增量导致的难以保证数据连贯性和一致性的问题。

最后,基于DTS的操作和维护都是Web界面,用户只需简单的3-4步即可完成配置,非常便利。

基于DTS本身具有的技术优势,且技术日渐成熟和完善,加上用户对大数据集成诉求的日益突出,DTS构建数据集成的能力也就应时而生。

2.3 基于DTS的数据集成方案


DTS在做数据集成方案的初期,产研团队做了非常充分的调研,并分析出了用户的核心诉求,主要聚焦以下四个方面:


DTS的「数据订阅」模块可以应用于数据集成并分发到下游的场景中,但订阅模块主要处理增量数据,无法实现全量+增量一起同步。经过多次的技术探讨和验证后,我们最终决定基于「数据同步」模块来做数据集成,技术方案:数据源先通过DTS同步数据到Ckafka,再从Ckafka消费数据投递到数据湖仓。



不过实际落地中,我们还是遇到了一些挑战。

2.3.1 全量部分数据块很大,如何提升导出导入效率?


使用DTS数据同步模块来做数据集成,可以满足全量+增量一起同步的诉求,但在大数据场景下,又不得不面临两个问题:对于大表(如10亿行以上),如何提升同步作业效率?对于超大的存量数据,在全量阶段遇到任务中断时,如何确保数据重入?

基于以上问题,DTS设计了分块导出方案,针对大表场景(如10亿行以上),从源库导出数据时将一张大表分为多个分块,一个分块连接一个线程,这样一张大表就可实现多分块同时导出,提升大表的同步效率。

在导入到目标kafka时,也是按照分块导入的,同时这些分块都会进行标记,如果kafka发生重启,可以根据标记来识别中断的分块位置,从中断的分块开始继续向目标kafka写入。使用这个方式,在遇到kafka异常时,就不需要从头重新写,大大提升用户体验。



2.3.2 多分区,如何保证按序消费?


为了提升用户消费的速率,消息投递到Kafka时一般采用投递到kafka的多个分区的形式,多个分区可以并行消费以提升消费速率,但在多分区处理过程中,会涉及投递顺序的问题,需要保证投递到每个分区的消息与业务生产的消息顺序保持一致。

在实现中,DTS向Kafka投递消息时,按照源库日志解析后的顺序来写入,因此可以实现写入Kafka顺序与业务生成顺序的一致。

DTS在拉取源库的binlog日志时,采用单线程机制,先保证日志解析结果与业务生产顺序保持一致,等写入到kafka的多个分区时,再按照多线程并发,最终实现了每个分区的消息都是按序排列。

这里需要说明下,投递到多Topic+多分区这种形式中,每个分区内的消息都是按顺序投递的,但是多个分区同时消费时,无法保证分区间按序消费,如果用户对消费到的消息顺序有严格要求,建议选择投递到单Topic+单分区的形式。

在选择按表名分区的场景中,源库同一个表的数据变更都会投递到目标Topic下的同一个分区中,因为日志的解析是按序排列,所以投递到Topic分区中的消息也是按序排列。



总之, 不论选择哪种分区策略,DTS都可以保证投递到各分区中消息的顺序性。

2.3.3 如何保证数据不丢?

要保证同步到Kafka的数据一条都不丢,那么所有的数据就需要有迹可循,哪些已经同步过了、哪些还没有同步过,都必须清楚可查。于是DTS通过对数据做标记,标识数据同步位置,以此来实现数据准确同步。

全量阶段,数据按照分块机制进行导出导入,DTS导入到目标端Kafka的每个分块都会进行标记,kafka异常时,可以识别中断的分块位置继续导入。

增量阶段,DTS内部处理源库的日志解析时会插入标记,来识别数据写入到Kafka的位置,如果任务中断再恢复,通过DTS内部标记,可以找到中断的位置,继续增量同步。

2.3.4 库表变更,能否灵活同步?


业务数据库经常会有库表结构的变更,而数据集成需要能识别并自动同步这些变更字段,否则,库表结构每变更一次,就需要手动改一次集成程序,这个维护工作量非常大。在DTS以前的链路传输中,库表结构变更的自动同步能力就已经具备了,直接集成即可。但是我们本次需要解决的是,当同步任务已经启动,用户想要追加/删除一个新的库表对象,如何做到一键化操作,让用户便捷维护。

这里,我们以追加一个表对象为例,同步任务已经在进行中,但是运行过程中发现需要新增一个表对象(例如表A),对用户来说,只需要在DTS任务列表页,进行一步可视化点击操作即可完成。

动态修改同步对象的过程中,其实DTS底层做了很多工作,对用户操作层面进行了简化,如上述操作案例:新增一个表对象(例如表A),DTS需要同步表A的历史存量数据,同时,已有的同步任务1还不能受影响。所以在实现中,我们在DTS后台构造了一个临时任务2,来负责同步表A的存量数据,当任务2完成后,再将任务1和任务2合并,以此来实现动态追加同步对象的效果。

相对于一般的集成工具,DTS在库表结构的变更,库表对象增加/删除等方面都是非常友好的,用户只需要在Web界面进行操作,一次配置,即可享受长期便利,大大减少用户的维护成本。

接下来,给大家重点介绍DTS的数据集成方案是如何配置的。

DTS+Ckafka+数据湖仓 生产实践

3.1 实践场景


数据源头为MySQL,通过DTS获取MySQL的全量+增量数据到消息队列Ckafka,然后适配消费Demo,将消息投递到数据湖仓。



3.2 前期准备


3.3 数据同步


DTS的操作比较简单,在腾讯云Web界面进行4个步骤即可,无需环境部署。

步骤1:创建DTS任务

购买一个DTS任务,源库选择MySQL,目标库选择Ckafka。

步骤2:设置同步源和目标数据库

配置DTS连接源库和目标库,源库配置中填入MySQL的主机地址/端口/用户名/密码,目标库选择Ckafka实例ID。

这个步骤主要是验证DTS到源和目标库的网络是否打通,对应的用户权限是否满足要求,如果源库有安全组设置需要允许DTS IP访问,否则网络不通。



步骤3:配置数据同步选项

这个步骤主要是选择同步的数据格式(Avro、JSON)、数据投递到Ckafka的哪个topic下、分区策略等。





步骤4:校验任务

上述配置完成后,DTS会对源和目标库的各项参数进行预校验,如Binlog必须开启,并且binlog_format需要设置为row模式等等,以保证数据同步结果的正确性。预校验通过后同步任务就可以启动了。



3.4 数据消费和投递


 步骤1:下载消费Demo样例

DTS同步任务正常运行后,下载DTS消费Demo样例,将Demo包解压后运行,进行数据消费。

这里以Go语言为例,解压Demo包后运行 go build -o subscribe ./main/main.go,生成可执行文件 subscribe。

然后运行 ./subscribe --brokers=xxx --topic=xxx --group=xxx --trans2sql=true。这里的brokers、topic、group分别填入Ckafka的地址、消费topic名称、消费组名称。

运行结果显示如下,表示Kafka正常连接,消费链路已打通。



步骤2:测试数据结果
在源数据库上插入一条数据。



在消费端即可查看到对应数据。



 步骤3:修改Demo,增加适配到后端数仓的代码逻辑
DTS提供的消费Demo仅对数据做了打印处理,用户需要在Demo基础上自行编写数据处理到后端数据湖仓的适配逻辑。

实践效果

使用DTS同步到Kafka的链路形式替代之前使用Canal组件的链路,最终实现高性能传输、高稳定性保障的同时有效降低了运维成本。


DTS提供的同步到Kafka数据集成方案具有通用性,目前已成功应用在出行、零售、游戏、互联网、金融等多个行业,并收获了用户的良好口碑。

总结和展望

DTS目前已上线了MySQL系列数据库同步到kafka的链路,为用户在大数据集成中提供了便捷的技术通道,后续为了满足用户更多的需求和更高的使用体验,DTS将聚焦「数据库生态」和「产品体验」上持续发力。








-- 更多精彩 --


跑好最后一公里!腾讯云数据库SaaS服务,全面升级



DTS搭载全新自研内核,突破两地三中心架构的关键技术
↓↓点击阅读原文,了解更多优惠