背景

在 CDC 数据流中,部分 topic 的消息较多较杂,而我们所需的消息很可能只是其中很少很少的一部分,这个时候让只需要这一小部分消息的服务都去重复过滤大部分无用的消息,一来浪费机器资源,二来则是消息不够『干净』导致下游服务重复的各种 case 处理。octopusflow-router 的设计初衷有以下几个:

  1. 动态过滤与路由消息至不同的 topic
  2. 从源头统一 DB binlog 增量和存量(select)数据模型,并解决 select 和 binlog 同时使用带来的时序乱序问题
  3. 尽量不改动 maxwell 配置,让 maxwell CDC 数据流管道尽可能稳定实时。使用 octopusflow-router 来动态控制

在 MQ 中引入这种过滤路由层,坏处则是会冗余消息,原来一个 topic 就搞定的逻辑现在拆散为多处了。好的轮子不嫌多,本着学习实践 Vert.x 和 Kafka Stream 的态度,octopusflow-router 值得拥有。

架构设计

从背景及需求上来看,这个需求其实是不难,统一 binlog 和 select 的地方需要费一些心思。为了追求优雅的使用方式,octopusflow-router 考虑了热加载配置及 kafka 流处理任务。为了充分体验 Java 和 Kafka 生态优秀的轮子,这里选用了 Vert.x, Kafka Stream 作为核心依赖库,使用 Vert.x 的 eventbus 解耦各组件。

从需求概览来看,可以分为 Kafka Stream filter 和 DB dump 两大模块,如下图所示

octopusflow-router-detail

Filter 和路由利用了 Kafka Stream 提供的 Predicate 和 Stream filter. Dump Mock 暂定支持全表、根据部分行 ID 和自定义 SQL 三种导数能力。

从具体实现分解上来看,如下图所示

octopusflow-router-impl

借助 Vert.x 极其优秀的扩展能力和 eventbus 的解耦能力,这里主要构建了配置中心热加载模块,支持 HTTP 方式获取及更新配置,其他配置方式可以自行增加 Config Store 接入。配置有变更后,通过 eventbus 发布,实例 StreamsTask Verticle 捕获事件并根据最新的配置热加载 Kafka Stream 流处理任务。

虽然是第一次体验 Vert.x, 但还是被它的轻量级和扩展性震惊到了,非常典型的事件驱动编程方式,通过 Verticle 水平扩展和隔离,架构也变得优雅了起来。相比于 Spring 之类的重量级武器,Vert.x 让你感觉你不是在用框架而只是在用一个库而已,而且这个库的众多能力可以自己掌控,可以说是非常轻量级了。对于 Kafka Stream, 想必知道的人就更少了,很多人做流处理眼中基本只有 Flink, Spark, 实际上对于轻量级流处理,尤其是使用 Kafka 作为 MQ 的,Kafka Stream 可以说是非常轻量和优秀了。这两个库可以让你忘记一切框架,你的代码你做主。

如何使用

对于用户来说,最关心的莫过于如何使用了,这里最关键的其实是配置文件,从配置文件基本就能知道 octopusflow-router 具体在做什么了。

{
  "http.port": 8080,
  "kafka": {
    "servers": "192.168.16.177:9094"
  },
  "task": {
    "maxwell_key_router": {
      "task_type": "MaxwellKeyTask",
      "source_topics": ["maxwell"],
      "sink_routers": {
        "octopusflow-user": {
          "whitelist": ["user.account", "user.info"]
        },
        "octopusflow-nlp": {
          "whitelist": ["nlp.ogeek-dataset"]
        }
      }
    }
  }
}
  • http.port 顾名思义,是用来通过 HTTP 获取和更新配置的。/config 即为配置入口,GET 为获取,POST 为更新。
  • kafka 为全局配置,也可以嵌入到 task 内层中局部覆盖。
  • task 为 octopusflow-router 核心配置,默认 key(比如任务名 maxwell_key_router) 即为 kafka consumer group 的组名
  • task_type 为任务类型,目前就一个 MaxwellKeyTask, 工厂方法将根据不同的任务类型绑定不同的任务实例。
  • source_topics 为监听的 topic 列表
  • octopusflow-user 或者 octopusflow-nlp 为写入的 topic 名,同时这个 key 下绑定白名单 whitelist
  • whitelist 过滤用的白名单,可以使用 r'' 启用正则模式,匹配上了就通过并写入 sink topic,未匹配则 block

虽然有这篇简单的设计和实现文档,估计还是会有人不知道这个项目到底在干啥的,对 Vert.x, Kafka Stream 和 maxwell 感兴趣的可以参考本项目源码地址 octopusflow-router, 纯 Java 打造。

本篇文章的大背景其实是前文 搜索/推荐工程之实时 CDC 数据流 的一部分,有兴趣的可以前往阅读。