实时湖仓一体在腾讯的落地实践

需要进行数据处理的公司在湖仓演进的架构选择上都十分相似。起初,首选方式是数仓架构,比如teradata 、greenplum或Oracle等。通常数据处理的流程是把一些业务数据库,如Transactional Database等,通过ETL的方式加载到Data Warehouse中,再在前端接入一些报表或者BI的工具去展示。,自Bill Inmon提出数仓概念以来,从90年代的美国到国内,数仓架构一直是一个比较经典的架构,它可以高效处理结构化的数据,而且性能好、速度快。尤其是teradata,它是存算一体的架构。,但是随着业务类型增多,我们需要扩展更多的业务场景,如数据科学或机器科学领域等。数据类型和数量也随之增多,结构化数据在互联网领域只占很小的一部分,还有很多半结构化、非结构化的埋点日志和音视频数据等。,我们的数仓已经无法处理更多数据,一些新技术,尤其是开源等多个领域的大数据技术开始涌现。,我们逐渐将架构划分为数仓和数据库的双层架构,把数据先加载到数据湖中,通常我们会选择Hadoop数据库作为自建数据湖。如果要做高效的查询或者报表的输出,我们会对数据再加工,放入高性能的数仓中,如ClickHouse或Doris等。,大概从2010年开始,随着Hadoop的盛行,绝大多数互联网公司都在用这样的架构。大家如果使用过Hadoop,相信也能感知到它可以支持各种不同的场景,基本上能够满足所有业务场景。,缺点:,图片,这个架构整体偏离线处理,随着流式框架的引入,大公司整体的数据处理架构在2015年后就变成了仓、湖、流三种架构。,根据不同的场景选择不同的架构,比如我要做一些Ad-hoc的场景,我们会选择在仓里面进行;如果要做一些定时的报表或业务报表,则用Spark;如果想要做一些流式数据的查询和分析,则可以用Flink之类的工具。,这个架构存在几个问题:,大概于20年左右提出了湖仓一体的架构,试图用一个统一的湖上建仓或湖仓一体的存储架构,解决数仓和数据库的问题。,针对传统意义的数据湖,若在对象存储或者Hadoop上能够构建出具备数仓语义的一个格式,使得我们在湖上的格式有更强的能力去做数仓,则需要具备几个条件:,图片,前文讲述了湖仓一体技术所要具备的几个特性,如今在开源领域主要有三种技术拥有这些特性,分别是:Hudi、Iceberg和Delta Lake。,它们的功能整体上比较接近,都是一种数据的组织方式,即定义了一种表的格式,这个格式主要是定义数据的组织方式,而不是确定一种数据的存储格式。与一些纯粹的数据格式或Hive表(Hive 3.0版本前)相比,它提供了ACID事务能力,这样就具备了仓的能力,它可以提供一些事务的特性和并发能力,还可以做行级数据的修改、表结构的修改和进化,这些都是传统大数据格式难以完成的事项。湖仓一体技术出现后被业界迅速采用,从21年开始就进入了Gartner技术成熟度曲线的评估。,图片,下图左侧是我们一个旧的数据管道。举个例子,要收集一些Spark的审计日志以观察每天的情况,那么我们就可以把Spark日志都导入到消息队列中。在腾讯内部使用的是TubeMQ,然后我们有一个服务TDSort用于归档,把数据按照小时或者天的时间格式分类,紧接着保存至HDFS上,再启动一个Hive的命令,把它添加到分区内。,图片,前面是通过流式进入,后面是批的落盘,整体设计比较复杂。为了保证exactly-once以及保证流转批的可见性,我们在原子性上花了很多心思,因为在原先的架构上我们缺乏事务的能力,所以我们通常依赖HDFS的原子性来保证可见性。,之后我们把整体架构迁到了以数据湖格式为体系的另一套架构中,选择用Flink来做流式的入湖,把它写到HDFS上,这样整体链路就变得更为简单。对于Flink写下的数据,我们主要选择的是Iceberg,在Flink读取把它写到Iceberg中,下游就能直接可见。,至此,原先T+1的可见性就变成T+0,这个是最典型、最常见的一种使用方式。这也是我们内部像广告和视频号等业务的主要使用方式,把小时级的数据可见性降低到分钟级的可见性。,CDC在腾讯内部不算是非常大的场景,但原本通过拉链表方式去构建,会带来一些问题:一是延迟,二是后续的处理流程非常复杂。,我们现在改成了另一种方式,使用Flink的CDC Connector,再加上Hudi。因为针对CDC而言,Hudi在这方面的能力比Iceberg更成熟,所以选用Hudi而不是Iceberg。,有两种方案,一种方案是直连MySQL或PostgreSQL等类似的数据库,另一种是通过消息队列的方式,通常都是使用第一种方式,这也是比较常见的一种内部形态,与前面相比Flink CDC connector与MySQL直连获取binlog。,图片,在业务侧使用整套湖仓一体技术后,从原先的Lambda架构转换成了湖仓一体的架构。在原先的架构中,流和批分离,流主要是用消息队列来做流式的Pipeline的构建,还有一条离线链路做数据的回补和对账等。但是离线存在于HDFS上,这样就会导致两条链路要做同一份数据的处理。,使用湖仓一体就相当于把它们合并,我们在ODS、DWD或者DWS层统一用Iceberg来进行流式写入。在流式写入后,可以在每一层中做离线或者批的分析,也可以一直做流式分析,因此同一份数据既做到了流式的读和写,又做到了批的读和写,一份数据就可以适配整个场景,不需要存多份数据或者接多条ETL Pipeline。这就是我们比较典型的一个架构,腾讯视频也是在这个架构基础上做演进。,图片,回到湖仓一体的本质,即使我们不需要上述的特性,相比传统的Hive表,它也带来了很多新的特性和能力。用于取代离线的场景化,也会有更好的效果。,数据治理:,数据查询:,图片,随着湖仓一体实践的逐渐深入,尤其是当单链路的数据量达到分钟级,每日达到万亿规模时,湖仓一体的性能问题就要格外重视。,批处理希望能够有更多的数据块聚合在一起读取,做到更多样、更大的吞吐,流则需要更快的响应。,图片,抛开内核,无论是Iceberg还是Hudi,本质上都是海量文件的组织方式,无法摆脱存储的限制,我们通常会把它存到内部的HDFS上,云上则会存到对象存储中。但对象存储也有它的限制,吞吐量较大,但延迟会较高。,如果需要流读,我们通常在构建实时链路的时候,会选择消息队列,它的存储模型完全不同,是低延迟高响应,顺序读写。它的存储能力决定了计算,流式计算的访问方式和离线计算的访问方式不同。,这个时候就会出现两个问题:,图片,总体来看,设计出这些特性后,测试数据显示,我们内部的TDW与Spark相比,性能大大提升。,图片,Snowflake或者Redshift之所以那么快,很重要的一点是因为它有索引,但我们传统的Hive表几乎没有索引。Iceberg具备了构建索引的能力,也具有ACID能力,而且它的表结构也更复杂,所以我们能够构建索引。,具体成果:1)引入一个索引框架;2)构建了不同类型的索引。,我们做的是全局索引,针对每个Data File生成对应的Index File。Index file与datafile绑定,内部有一套系统会异步更新或者生成Index。我们选择Puffin作为存储的格式,它是Iceberg定义的一种Index的存储格式。我们也改造了一定的语法,使得它能够支持索引的生成。,图片,整体完成后,我们有一个点查的场景,bloom filter就比较适合点查的场景,速度与原来相比有一个数量级的提升。,图片,我们在使用湖仓一体技术的时候,流式的性能已无法实现突破,因为受制于底层的存储,使用HDFS或者对账存储则缺乏更低的延时,所以我们也在参考社区的方案。,Flink社区提供了一个Flink Table Store的方案,把流存储和批存储融合为一体,现在改了名字,叫做Paimon,我们参考其做了类似的方案。在这个方案中,流和批选择了不同的存储,流选择使用消息队列,批则是底层使用数据湖的格式,封装在一起就成为了流批表。有了流批表,则能够对外提供统一的流和批的读写接口。,我们主要是对接Flink的场景,写的时候我们会双写到LogStore和Filestore这两个系统中,根据不同的场景读不同的系统。如果是流式则读LogStore,批则读Filestore。,优点:,图片,我们引入了自动数据治理的概念,它与传统的数据治理方式的区别在于它基于事件驱动,而不是基于时间定时完成。其具备以下能力:,具体的运作步骤:它会在Iceberg的存储中收集一些事件,根据事件分析当前要进行的操作,然后根据规则来生成这些操作。,图片,在做小文件合并时,如何生成这些规则?,传统意义上的小文件合并,通常来会设定一个时间点,比如每隔一小时或者每隔一天做一次,但这样会产生很多无效的作业。若你的写入很快,那么可能会有大量的堆积,若你写入很慢,那么就可能有很多无效的合并操作。,我们通过收集每一次commit后写入的增量,求均方差,判断当前是否达到阈值。若未到阈值,我们会逐步更新它的均方差。如果达到阈值,就会触发一个小文件的合并操作,根据事件来驱动。这样的形式会比先前的方式更能节省资源,效率也更高。,图片,现在社区也有,但我们更早开始,它主要是能够做到加速多维查询,把相关的record归类放在一起。我们会通过事件收集相关性极高常被查询的列,自动给用户推荐可以重排列的数据,并询问是否需要重排列。当用户决定重排列,数据就会进行增量,做后续的重排列,这样就能提高数据整体的有效过滤率。,图片,我们对Iceberg引入了一个索引框架,支持bloom filter 和 bitmap的构建,但是用户并不知道如何使用索引。所以我们提供了自动索引的构建能力,会根据查询的信息分析出哪些列的用户查询频度较高,接下来我们会优先在这些列上构建索引。同时,我们选择了根据分区的增量来加theta sketch的方式来做增量的索引,而不是每次都做全表索引的重构。构建索引后,Iceberg的常用性能会出现一个大的跃升。,图片,我们希望湖仓建设从原先的准实时湖仓向实时湖仓的架构迈进,也希望湖仓一体架构在经过元数据、缓存和索引的优化后,能够解决交互式查询和流的所有场景问题,用一套存储应对所有的场景。这是我们现在在做的事情,也是未来的目标。,Q1:前面提及CDC的构建,是按照整库入仓还是按表的方式来进行?,A1:我们腾讯这边的量不算大,我们内部主要还是以append方式入湖,CDC则仍是按表的方式来,没有做太多的优化,也没有涉及整库的方式。,Q2:您提到小文件合并,具体的优化是指要另起一个旁路作业,还是指将这部分的功能并入到写入的流程里?,A2:我们采取离线和异步的方式,因为如果并入到写入的流程,会对整体写入造成拖垮或者堆积效应,所以根据我们内部的实践以及单链路1000多亿的日均写入的经验,同步写入和合并的这种方案并不可行,所以我们做的是异步方案。,Q3:有些场景会选择Hudi,另外一些场景选择Iceberg,请问Iceberg和Hudi的选型依据是什么?,A3:我们八成以上的场景都选择了Iceberg,因为我们投身及使用Iceberg社区的时间较早,所以对Iceberg的的整体把控会更好。只有涉及CDC的场景,我们才会用Hudi,因为Iceberg当前的CDC能力不够成熟,但我们也在探索和建设Iceberg的CDC能力,包括全局索引的能力、部分列的更新能力等,也是为了全链路CDC所做的优化。如果未来Iceberg具备这样的能力,我们应该会统一使用Iceberg,因为维护多套系统会增加维护的成本。其实这两个技术没有太大差别,只需选择一种即可,实际上社区的演进最终都会趋同。,Q4:Iceberg上有Spark和Flink等多个引擎,假如我建了一个Iceberg表,可以用Spark和Flink两种引擎同时访问底层的表吗?,A4:可以。因为它有所谓的事务的语义。这也取决于你的锁如何实现,默认使用比如HiveLock等可以做隔离,所以能够多引擎地去写,但会有一定的冲突概率。但针对读而言,因为Iceberg生成的每一个副本都是只读的,所以多引擎去读没有任何问题。,Q5:数据湖在应用侧的使用场景有哪些?,A5:数据湖从20年初引入到现在,在腾讯内部每年至少有10倍以上的规模增长,所以现在几乎所有的业务线都在使用。最大的业务线一般是视频号或者广告之类,也有其他的业务,基本上所有的业务都在用数据湖,无论是用于加速数据的可见性、构建CDC还是用Iceberg替代Hive表的低效查询,都会带来一定的性能提升,这些场景前文有所提及。,图片,邵赛赛,前腾讯实时湖仓团队负责人,现Co-Founder & CTO of Datastrato。Apache基金会成员,Apache Spark Inlong Livy PMC成员,曾就职于Hortonworks、Intel,10年的大数据从业经验,专注于分布式流批计算引擎的研发和优化。,实时湖仓一体在腾讯的落地实践

文章版权声明

 1 原创文章作者:cmcc,如若转载,请注明出处: https://www.52hwl.com/27563.html

 2 温馨提示:软件侵权请联系469472785#qq.com(三天内删除相关链接)资源失效请留言反馈

 3 下载提示:如遇蓝奏云无法访问,请修改lanzous(把s修改成x)

 免责声明:本站为个人博客,所有软件信息均来自网络 修改版软件,加群广告提示为修改者自留,非本站信息,注意鉴别

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2023年6月23日
下一篇 2023年7月15日