本篇是『搜索/推荐工程』系列文章的第一篇,整个系列大致会围绕搜索和推荐工程中数据的产生、处理、迁移同步和应用展开,本篇主要阐述数据产生和迁移同步这两个环节。

背景及需求分析

对于搜索和推荐来说,没有数据就像是无源之水无本之木,而这里面的数据又可以粗略分为两大类:业务数据和行为数据。行为数据通常走 Kafka 这种 MQ,业务数据则通常走 MySQL 这种关系型数据库,对于搜索来说,通常是基于开源的 ElasticSearch(ES) 这种专业引擎进行业务开发。那么问题来了,这里涉及到三种不同的存储系统,而搜索又比较依赖前两者,寄希望于业务开发双写 DB 和 ES,较不现实,因为对于大部分团队来说,业务开发和搜索/推荐往往是两大团队,另外双写需要处理 ES 写失败等场景,同时 ES 的一些优化不可能总是去改业务代码,这些都给业务端快速迭代造成了不小的障碍。

考虑到搜索推荐的实际业务场景,一般都不需要强一致性保证,而且延时在秒级是可以接受的。那么根据这个特点,很自然地就能联想到可以通过 DB 的 binlog 和 MQ,构建一套异步实时处理系统来保证不同存储间的最终一致性,并且这种延时通常可以控制在秒级,对搜索推荐场景的时效性是足够了。

搜索/推荐数据流架构设计

这里以典型的 MySQL 和 Kafka 为例对数据流进行说明,下面的架构图基于我和团队从 0 到 1 构建某互联网金融 APP 的搜索/推荐架构图做了适当增改而来,其中包含了数据链路中的产生、处理、分发和存储的全过程。

搜索/推荐数据流架构图

下面我将围绕架构图来逐一说明组件选型细节。

数据采集

从最底层数据采集逐个往上说起,业务数据一般由业务开发团队开发并维护,比如用户的基本信息,购买/收藏/互动的产品信息等,这类数据一般维护在关系型数据库中,比如 MySQL,结构化比较强,读取使用比较方便。行为数据一般是一些类似点击,滑动,关闭等行为,这类数据往往需要前后端一起埋点,由于与用户行为息息相关,前端如 Web/iOS/Android 这类的埋点工作相对多一些。

对于埋点/打点的工具,有些会选择通过后端提供 HTTP 接口发送埋点数据,有些则直接通过 Kafka async producer 这类对后端友好的方式发送。考虑到各端开发成本和数据字典沟通成本,我建议所有开发团队使用专业的的埋点 SDK,给业务开发和后续数据对接打下良好的基础。前期数据生产不规范,后期使用起来要人命。目前国内较为专业的埋点 SDK 有提供 SAAS 服务的神策分析,也有专注于日志采集的阿里云日志服务(SLS), 这两大产品均有专业的埋点 SDK,能覆盖几乎所有主流语言。对于神策来说,他们家提供的远不止埋点 SDK,提供的实际上是整条数据链路,直至最后的分析和使用。如果真的需要自研埋点 SDK,我相信神策和阿里云的这些 SDK 实现是一个比较好的参考。

binlog 事件流

随着互联网业务的蓬勃发展,可以看到如今的一个趋势是实时流处理越来越大行其道,越来越多的一个需求是将数据库中的数据变更同时以流的形式向外推送,于是乎基于 binlog 的流式方案也是越来越多,功能也越来越强。就我所用过的两个工业级别的 binlog 流式工具来说,一个是国内阿里的 canal, canal 最近几年更新也比较多,功能也多了起来,比如直接写 Kafka/Elasticsearch. 另一个则是 maxwell, 将 binlog 转换为 JSON, 并可以自行配置输出 Kafka 等 MQ。曾经为了同步到 Elasticsearch 还短暂用过的还有 siddontang 的 go-mysql-elasticsearch. 下面简单说说这几款工具的优缺点。

  1. canal: 总的来讲使用这个方案问题不大。
    1. 优点: 国内阿里程序员主导开发,国人熟悉,使用也很广,文档还行。
    2. 缺点: 协议还是略显复杂,安装使用起来稍显麻烦,直接写 Kafka 等方案还需要进一步完善。
  2. maxwell: 个人偏好这个
    1. 优点:国外 zendesk 开源,项目代码质量不错,官网文档清晰,配置灵活,同时对于直接写 Kafka 的话运行起来很简单,输出的 JSON 也很好上手。
    2. 缺点:暂无官方 HA 实现,不过一般来说出问题往往是 offset 或者 schema 有问题,即使实例本身有问题,健康检查后自动重启了
  3. go-mysql-elasticsearch: 部署简单,适用于仅需要单表同步到 ES 的场景
    1. 优缺点同样突出,不赘述
  4. SLS logtail: 阿里云商业产品,应该是基于 canal 做的
    1. 可以一试,但整套服务需要都跑在阿里云上了,MySQL binlog 接入 上可以详细看看,实时性可以问下客服
  5. Debezium: Redhat 支持开发,大量复用了 Kafka Connect 能力
    1. 优点:支持的异构存储特别特别多
    2. 缺点:需要更多生产实践,其中关于如 bootstrap 的设计我觉得和实际生产使用不符,锁表这种行为肯定是不能接受的!

因为个人偏好 maxwell, 这里多说几句,官网文档写的真的不错,建议通读一遍。另外为了防止写入 Kafka 不同分区比例严重失衡,建议使用参数 --kafka_partition_hash=murmur3, --producer_partition_by 参数按需选择,建议至少要包含主键,加入库和表信息也可以。如果用的是阿里云 RDS,需要屏蔽部分表信息,具体忘了,可以 aliyun 关键词看下 issue.

对于 HA,可以尝试自己基于 Zookeeper 定制,这个 maxwell HA PR 可以参考。其实真出问题了,这种 Active-Standby 往往也于事无补,实例有问题,健康检查机制可以自动重启实例。实例逻辑无问题时,需要看看日志中 binlog offset, schema 等是否有问题,实在有问题的时候,简单粗暴的方案是 maxwell 库重建即可。但需要注意这段时间的增量数据丢失情况,理想情况下需要确定问题再找到对应 binlog offset 修正 schema 并重新消费。maxwell 的其他细节使用,中文版还可以参考 maxwell 详解

除了以上几种工具,实际上还有个比较有希望的的开源项目,那就是 Netflix 即将开源的 DBLog 和平台级方案 Delta. Debezium 最近基于 Kafka Connect 做了很多 connector, 可以说是很全了,但是关于DBLog 是真正从生产实践角度出发设计的 CDC 系统,其中用到的 binlog 解析库和 maxwell 师出同门。期待 DBLog 正式开源的那一天,从此一统开源 binlog CDC 领域 :)

最后再说说阿里的日志服务 SLS,其实个人觉得 SLS 能实时接入 binlog,同时结合已有的函数开发能力,将会是一个非常值得期待的轻量级 CDC ETL 平台!然而就目前的关于函数消费的 文档来看,还只能定时触发消费,无法实时消费,实在是一大遗憾。

实时计算

这一层所谓的『实时计算』可以包含很多概念,如基于行为事件的无/有状态计算,计算近 1/12/24 小时播放量,部分轻量级实时推荐策略等,亦或是由 binlog 实时流触发的宽表部分字段更新。Kafka Stream 的出现让这些轻量级实时数据处理变得更为容易,水平扩展得益于 partition 的水平扩展机制,也比较容易。与其动不动就上重量级武器如 Flink/Spark Streaming, 倒不如好好思考下一些简单的实时事件处理基于 Kafka Stream 是否更容易迭代和部署。Kafka Stream 的使用极其简单,Kafka 提供的是非常轻量级的库 kafka-stream, 业务逻辑写起来非常简单,基本没有诸如 Spark/Flink 之类的学习成本,而且可以和普通应用一样部署在 K8S 上,部署负担小很多。写过 Spark streaming 或 Flink 的人应该能明白我说的轻量级实时处理直接用 Kafka Stream 的好处了。

数据仓库

对于推荐,实时推荐由于获取的信息比较局部,难以非常精准,一些特征往往还需要离线运行一些重量级机器/深度学习任务才能跑出来,这个时候自然需要业务和行为数据。传统的数仓方案为自建一套大数据平台,然后招一个团队来运维这套大数据平台。根据笔者的实际工作经验来看,对于大部分中小公司来说,这种投入产出比太低了。大数据平台所需的硬件成本不说,平台上一堆软件需要运维,而且动不动就是小文件或者数据倾斜问题,分析人员写出的 SQL 性能也是惨不忍睹,集群的资源能被耗光也不见得有多少产出,还容易遭到用户吐槽…

自从在阿里工作体验过 Dataworks, Maxcompute 这一系列组件后,真的感慨比开源的 HDP 那一套好用多了,成本也不高。如果再让我选型一次,数据仓库这一组件我会毫不犹豫选择阿里云的 Maxcompute, 而不是自建大数据集群然后基于 Hive 开发一系列离线任务。配合 Dataworks 开发套件,整个体验不要太爽。

为什么需要宽表

其实在我设计第一版同步组件 MySQL ==> ES 时,并没有加入宽表这一层,直到来字节跳动工作后,和同事讨论过后,越发觉得宽表这一环节对整体环节十分重要。优点如下:

  1. 直接映射 ES 的文档设计
  2. 作为 Redis 的缓存数据来源之一
  3. 作为数据平台的来源之一,虽然也可以将所有业务表都导入数据平台,再做 ETL 产出宽表
  4. 数据迁移时不用重复计算,设定 QPS select 宽表即可

数据迁移与同步

这个过程在关系型数据库和数据仓库间发生的频次尤其地多,比较原始的工具有 sqoop, 高级一点的有基于 binlog 自研的,如美团的 数据仓库同步架构于实践, 字节跳动内部的同步也有异曲同工之妙,这种方式对于大表非常友好,节约资源。如果使用商业方案,阿里云的 Dataworks 中可以建一堆不同类型的同步任务,如果要更高级一点的,还有 DTS。所以中小公司基于阿里云来构建数据平台这一套服务是非常省时省力的,成本也不高。

说完了上面的离线场景,我们再来说说搜索和推荐的实时同步场景,开源的方案有不少基于 Kafka Connect 来设计的,自己只需要写 source, sink 的逻辑就好了,设计好协议两边就能对接了。前面提到的 Debezium 也可以考虑。如果数据量特别大,基于 Flink connector 来设计也不失为一种好的方案。

Octopusflow - TODO

最后出场的是自己正在着手设计的 Octopusflow 寓意是能像八爪鱼一样打通各种数据链路。之前在 Go 和 Java 间摇摆不定,现在还是觉得复用 Kafka Stream/Connect 来做落地最为容易,Java 对大多数人也算熟悉。