概述
最早接触doris是在2020年初,当时是为了解决在海量数据上实时高并发查询的问题,当时调研了很多框架,在使用这Doris之前我的架构和其他公司的架构基本差不多,Hadoop,Hive,Spark,Presto,但是这些都满足不了我的需求,在调研Clickhouse的时候,发现了Doris,看网上介绍从性能上,并发性及易用性上都非常好。在深度做了测试之后给我的是更大的惊喜,我之后就将我的架构全部转向以doris为核心去构建。同时也深度参与到社区,提了一些RP去改进Doris。
业务介绍
蜀海供应链成立于2011年6月,是集销售、研发、采购、生产、品保、仓储、运输、信息、金融为一体的餐饮供应链服务企业,现为广大餐饮连锁企业及零售客户提供整体食材供应链解决方案服务。
我们主要业务业务如下图:
- 领导驾驶舱数据实时分析,T+1报表分析
- 财务各类日报,月报,对账等
- 客户销售:采购类实时报表、日报、月报各个维度的数据分析及查询销售明细数据
- 供应商类:采购分析,供应商对账分析,采购策略优化等
- 仓储库存周转、库位、实时库存等各种维度数据指标及报表需求
- 运输类:准点率、温控、运输成本,调度等分析
- 数据分析师:快速响应各种数据分析需求,及高层领导各种临时数据需求,数据挖掘及各种实时交互式分析
- 各业务运营/策略/负责人主要查看各自业务运营的整体情况,查询数据中台的各该业务各种维度实时聚合数据
- 还有一些其他业务的数据报表及分析需求
作为公司的大数据团队主要负责构建公司级的数据仓库,向各个产品线提供面向业务的数据分析服务,如:销售额,毛利,库存周转、客诉问题、销售达成率、物流准点率、智慧工厂、供应商等业务线,在过去半年多的时间里我们通过对Doris的应用实践,基于Doris构建了蜀海的实时数据仓库。
本文总结一下我们在这期间的工作,和大家一起分享,共同探讨。
我们的数仓分层:
大数据团队主要负责到ODS-DWS的建设,从DWS到ADS一般是数仓系统和业务线系统的边界。
在过去,由于缺失统一的数据仓库,业务系统之间又相互依赖,业务系统那边也探索了很多模式来支持各个业务线发展。但是效果都不是很好,出现各种各样的问题,随着业务的发展,数据量也越来越大,之前的模式也越越来越不堪负重。
架构演进
架构1.0
我来公司之前这边大数据团队规模也比较小,业务量也没那么大,当时只是为了支撑海底捞门店补货系统(几家门店试点),搭建了基于CDH一套大数据平台,主要是为了完成每十分钟给补货系统推送一批计算好的数据,主要是POS销售数据,沽清数据等,每天在全量的推送一次一整天的数据,当时的架构如下:
这是当时的大数据架构,为了解决这个问题,基本上能用的组件都用了,但是数据的实时性还是满足不了需求。
这个架构存在的问题:
- 数据生产链路太长:中间经过了三四次落地操作
- 不支持标准SQL
- 数据冗余太多:数据从阿里云到最终输出,中间保存了三份同样的数据,
- 聚合查询效率不高:十分钟的数据基本要三四分钟甚至更长时间才能计算出来结果
- 开发成本太高:基本把CDH套件里的组件全部使用了
- 不能快速的响应业务各种数据需求
架构2.0
为什么选择Doris
我来蜀海之前就是Doris的用户,深知Doris的有点,在这里还是说一下。
对我们用户来说,Doris 的优点是功能强大,易用性好。 功能强大指可以满足我们用户的需求,易用性好主要指 兼容 Mysql 协议和语法,以及 Online Schema Change。 兼容 Mysql 协议和语法让用户的学习成本和开发成本很低, Online Schema Change 也是一个很吸引人的 feature,因为在业务快速发展和频繁迭代的情况下,Schema 变更会是一个高频的操作。
对我们平台侧来说,Doris 的优点是易运维,易扩展和高可用:
- 易运维指 Doris 无外部系统依赖,部署和配置都很简单。
- 易扩展指 Doris 可以一键加减节点,并自动均衡数据。
- 高可用值 Dors 的 FE 和 BE 都可以容忍少数节点挂掉。
基于Doris的数仓架构
如下图:
- 数仓整体以Doris为核心构建公司企业级数据仓库,(
- 通过统一的数据采集系统,多种数据采集手段,包括Mysql binlog解析(Cannal),日志采集Flume(Doris审计日志)、埋点接口等实现多种异构数据的采集
- 将采集的数据统一通过消息队列(Kafka)完成高并发的数据吞吐,同时实现数仓及计算引擎的解耦
- Flink计算引擎完成数据的ETL处理及实时数据的统计,并将数据推送到Kafka及Doris(Stream Load)
- 对外通过doris和消息队列对外提供数据服务
- 数据质量管理是实现对从数据采集到数据ETL处理,数据存储及数据服务全生命周期的数据管理,包括元数据,数据质量,数据规范、数据安全
下面这个图可能更清晰的看清楚我们基于Doris的数据流向
新架构优点
•数据导入方式简单,我们针对不同业务场景使用了三种导入方式
1.Routin load : 异构实时异步数据导入(这种主要使用在一些临时的数据接入)
2.Stream Load :业务数据实时异步数据接入(我们封装了业务数据零代码入仓,后面我会介绍)
3.Insert into :定时通过 DWD 层数仓表生成 DWS/ADS 层数仓表
•数据链路缩短,数据实时性更高
•数仓使用成本降低
- Doris 支持 MySQL 协议,数据分析师可以直接进行自助取数。
- Doris 不依赖 Hadoop 生态圈组件,降低了架构的复杂度,大大降低运维成本
- Doris 同时支持离线批量和实时数据,一个框架搞定一切
我们做的一些工作
我们基于Doris开发了自己的数据中台,主要是为了解决以下五个问题:
- 找数:要知道数据从哪来到哪去
- 理解数据:这个数据是干嘛的,每个字段是什么意思
- 问题评估:一个新的需求过来,怎么评估开发,会使用到哪些数据,这些数据关系是什么
- 取数:上面问题都解决以后,数据怎么获取
- 数据可视化展现:这是最终呈现的结果
围绕上面这五个问题,我们设计和开发了公司的数据中台,下面主要介绍基于Doris开发数据中台过程中我们做的一些工作。
零代码入仓
要做到业务快速响应,首先你要能实现数据的快速数据接入,包括数据接入过程中的 ETL
我们这边业务库基本都是 Mysql,也有非 Mysql 数据库,基于这种情况,我们采用了 Canal 及 Datax 完成数据采集,同时对 Datax 进行了改造,是 Datax 抽取的数据格式和 Canal 一致,然后通过 Flink 基于 Doris Stream Load 完成数据入仓操作,整个过程可以零代码完成,并集成了我们自研的规则引擎,实现规则自定义及规则自动下发到Flink Job中,具体展示效果如下:
数据分析人员可以通过Web方式零代码完成业务数据接入,最后提交任务即可。
•数据接入
目前支持 Mysql,Kafka,Datax 这里我们采用的是Canal实现对Mysql binlog进行监控,然后将mysql的数据实时推送到Kafka
接入任务可以监控,接入数据量可监控
•数据入仓 数据接入到Doris数据仓库对应的表中,这里我们采用的是Flink实时消费KafKa的数据,然后通过Doris的Stream Load完成。 Flink消费Kafka数据我们支持两种方式:
- 指定Kafka Topic的Offset进行消费:kafka.offset
- 指定时间戳的方式:kafka.timestamp
•数据丢失的问题 针对Flink Job失败,可能会造成数据丢失的问题,我们解决方案如下:
- 如果你记录了失败的时间点的Kafka Offset,可以通过配置文件配置这个参数来重启Flink Job就行。这样不会造成数据丢失
- 如果没有记录这个offset,可通过指定consumer.setStartFromTimestamp(timestamp);这个时间就是在配置文件中配置的时间戳,这样无论是通过offset还是从指定的时间开始消费Kafka数据,都不会造成数据丢失
•数据重复问题
因为我们这个是在数据接入层使用的,数据是进入到数据仓ODS层,在这一层我们采用的是Doris Unique Key模型,就算数据重复入库,也会自动覆盖原先的数据(这是Doris Unique Key模型的特点),不会出现数据重复的问题
自研规则引擎
提供内置规则模板,及规则定义可视化开发界面,规则试跑,规则发布等,发布的规则会自动下发到Flink Job对应的作业中执行。
数据地图
基于Doris我们实现了元数据管理(业务元数据及技术元数据),提供物理元模型及血缘元模型的构建,提供一键搜索的数据地图服务。
血缘关系及关联关系
血缘关系:通过解析Doris 审计日志,自动化完成
关联关系:这个主要是在ODS(贴源层),因为业务系统数据库表没有主外键关系,在这里我要知道数据之间关联关系,通过手动定义维护
数据服务开发
我们为了快速响应业务系统的数据服务需求,设计开发了接口零代码开发平台,数据分析人员不需要写代码就可以快速完成API接口的开发,可以对接口进行可视化上下线操作,接口调用限制(黑白名单),支持降级限流熔断等,快捷方便,高效。
接口定义开发
数据指标管理
在指标系统完成基于审批流程的指标规范化定义,严格定义指标规范,规避指标二义性;支持和其它产品联动影响和展示,产品如模型设计中心、数据地图等
调度系统
我们将海豚调度深度集成到我们的数据中台中,各个模块可以很方便的将任务添加到海豚调度系统中运行及监控。
Flink Doris Connector
为了让 Doris 更好的适应各种异构数据的融合分析,使用大规模分布式环境下的机器学习场景及实时数据分析的场景,我们设计并发了 Flink Doris Connector,同时贡献给了社区。
详细的设计方案可以参考我的博客:
[ Flink Doris Connector 设计方案] https://my.oschina.net/u/3774656/blog/5017244
收益
目前10台BE ,3台FE(高可用)的 Doris 环境,效率、性能表现情况如下:
•支撑十几条业务线以上,整体响应达到ms级。
•支持百万、千万级大表关联查询,同时进行维表关联的雪花模型,可以实现秒级响应。
•日级别,基于销售明细现场计算,同时满足汇总及下钻明细查询,查询时效基本都可以控制在秒级。
•七日趋势分析,100毫秒-3秒,根据集群规模不同查询性能有所区别,但数据量较大时,调动的集群资源较多,因此MPP的并发性能受限于集群的性能。一般原则是并发较高的业务,需要严格控制查询时效(基本在毫秒级),对于并发不高的业务,允许进行较大的查询,但也要考虑集群的承受能力。
•通过应用以及Doris的不断改进升级,Doris的高可靠、高可用、高可扩展性也得到进一步验证,服务稳定可靠。
•入库性能,每秒峰值可以在30-40万条,同时不影响正常的数据分析
开发效率的提升:
•数据接入:整库几十张表在一天之内可以完成接入到数仓(包括ETL),无需代码
•报表开发:效率提升至少2-3倍,大大节省人力资源,而且不需要开发人员,只需要数据分析人员通过sql加工,然后拖拽即可完成数据可视化展现
•数据实时性:满足业务各种实时准实时数据分析需求,并且能做到快速响应