[mq] kafka3 consumer

kafka3 consumer

[TOC]

消费组

往群组里增加消费者是横向伸缩消费能力的主要方式

消费组内的消费者可以订阅不同的topic,这意味着不是所有的消费者都能接收到某个topic的消息,而必须要订阅

因此逻辑上消费者的查找关系是: 消费组–>订阅了该消息的消费者们–>消费者

分区再均衡

由于我们可能动态的增加和缩减消费组内的消费者(因为一个消费者是一个进程,增加一个进程是理所当然的),所以,分区与消费者的对应关系需要变化

在再均衡期间,消费者无法读取消息,会有短时间内的消息队列不可用

由于分区消费offset是由消费者维护的,当由新的消费者接替时,将会丢失offset,这时需要去zk/broker同步offset,也会增加一些开销

群组协调器

某个broker将会被指定为某个消费组的协调器,不同的消费组可以有不同的协调器.

消费者将会往协调器发送心跳heartbeat,来维持消费者和消费组的从属关系以及和分区的对应(所有权)关系.

消费组会在消费消息或提交偏移量时发送心跳,如果消费者较长时间未发送心跳,协调器就认为该消费者宕机,便触发一次再平衡

分区分配过程

无论是再分配还是初始分配过程,都是一样的,针对给定的分区和消费者,均匀分配所有权关系

该分配过程由消费组的群主承担,第一个加入消费组的消费者自动成为群主.群主从协调器那里获得群组成员列表和分区数,并给每个分区分配一个消费者.分配完毕后,群主将结果发送给协调器,协调器再将每个消费者的分配情况发送给消费者,消费者只能看见自己被分配的分区

创建kafka消费者

创建消费者需要指定:

  1. bootstrap.servers,比如:“broker1:9092,broker2:9092”
    1. bootstrap.servers也就是broker-list,但不需要包含所有的broker,客户端会做一个搜索,搜出所有的broker,一般指定两个,防止其中一个宕机
  2. group-id,即所属的消费组
  3. 反序列化器

订阅主题

consumer可以通过列表或正则的方式订阅主题

注意: 同一个Group中的不同Consumer实例可以订阅不同的Topic

即:订阅是消费者的行为,而不是消费组的行为

轮询

kafka是pull类型的消息队列,需要消费者自身定期轮询broker

轮询不只是包含消息拉取,还包含查找群组协调器并加入该组,接收分配的分区,以及后续的再平衡过程,以及心跳的发送

消息提交和偏移量

kafka不会像其他__JMS__队列一样,需要消费者在消费成功后返回ack,这是因为偏移量offset是由消费者自己维护的.

jms指java message service,是一种消息队列api规范

为了保证分区再平衡或消费者宕机后的消息偏移量丢失,消费者仍然需要以一种方式去向消息队列提交偏移量

具体操作是,消费者往一个叫__"_consumer_offset"__的特殊主题上发送消息,消息包含该消费者消费分区的偏移量

自动提交

消费者可以设置自己的提交方式为自动提交,默认下,每过5s,消费者就会把当前偏移量提交,间隔可以由某个参数设置,但是注意,提交操作是在轮询操作中的,而不是真正的定时任务,所以,提交间隔可能大于设置的参数.

自动提交一般以配置文件的方式设置

但显然,提交的offset总是要落后于当前offset的,再平衡后,消息将会被重复处理

手动同步提交

调用consumer.commitSync()来提交当前poll()返回的最新的offset

手动异步提交

同步提交会阻塞进程,可以使用consumer.commitAsync()来避免对程序吞吐量的降低

错误处理

我们在poll_loop里面,将会使用异步的提交,但是如果发生错误要退出进程了,将会在退出前使用同步提交同步一次offset

再均衡回调

当消费者订阅一个主题时,允许其传入一个再均衡处理器的回调类 ,该处理器需要实现两个函数

  • onPartitionsAssigned
  • onPartitionsRevoked

当由于新消费者加入或旧消费者退出导致的再平衡时,该处理器将会被执行.一般的,我们借助这个处理器来提交当前处理的offset

在go中,其实可以定义两个接口,一个实现onPartitionsAssigned,一个实现onPartitionsRevoked,然后传入的时候,参数形式是interface{},内部再断言一下就可以了

可以参考rpcx的插件开发,它会将所有的插件无差别的注册在一起,以interface{}存储,然后使用的时候,我们在一个特定的地方需要调用一个特定的插件,此时便遍历所有插件断言能否转成特定的插件

流指针改变

我们说消息队列是一种伪流,因为我们仍然可以操作流指针,使之回退或前进,相当于文件流指针的用法,通过seek方法改变当前读取位置

优雅退出

我们可能希望能在其他线程关闭所有的或一些消费者,在go语言里这是简单的,只需要一个经典的for{select{}}即可

对于信号,我们也要捕捉,因为在关闭前,我们需要做一些收尾工作,比如首先停止poll,但是对正在处理的消息,我们仍然要等它处理完成,然后提交offset.最后退出

独立消费者

有时候,我们可能不想让消费者加入某个消费组,而只是让他独立存在,这时候我们可以手动分配分区

前面我们订阅分区是通过consumer.subscribe("topic" , rebalanceHandler),消费组的id是在创建consumer时指定的.

手动分配分区时,首先去查询某个topic的所有分区列表partitionInfos = consumer.partitionsFor("topic"),然后遍历列表,组合成assign api要求的数据形式,最后调用assign.

注意consumer.assign(partitions)中,这个partitions是类似于[]TopicPartition一样的数组,TopicPartition是{topic,partition}是结构,因此,这是为了能订阅不同主题的不同partition而设计的.