kafka3 consumer
[TOC]
消费组
往群组里增加消费者是横向伸缩消费能力的主要方式
消费组内的消费者可以订阅不同的topic,这意味着不是所有的消费者都能接收到某个topic的消息,而必须要订阅
因此逻辑上消费者的查找关系是: 消费组–>订阅了该消息的消费者们–>消费者
分区再均衡
由于我们可能动态的增加和缩减消费组内的消费者(因为一个消费者是一个进程,增加一个进程是理所当然的),所以,分区与消费者的对应关系需要变化
在再均衡期间,消费者无法读取消息,会有短时间内的消息队列不可用
由于分区消费offset是由消费者维护的,当由新的消费者接替时,将会丢失offset,这时需要去zk/broker同步offset,也会增加一些开销
群组协调器
某个broker将会被指定为某个消费组的协调器,不同的消费组可以有不同的协调器.
消费者将会往协调器发送心跳heartbeat,来维持消费者和消费组的从属关系以及和分区的对应(所有权)关系.
消费组会在消费消息或提交偏移量时发送心跳,如果消费者较长时间未发送心跳,协调器就认为该消费者宕机,便触发一次再平衡
分区分配过程
无论是再分配还是初始分配过程,都是一样的,针对给定的分区和消费者,均匀分配所有权关系
该分配过程由消费组的群主承担,第一个加入消费组的消费者自动成为群主.群主从协调器那里获得群组成员列表和分区数,并给每个分区分配一个消费者.分配完毕后,群主将结果发送给协调器,协调器再将每个消费者的分配情况发送给消费者,消费者只能看见自己被分配的分区
创建kafka消费者
创建消费者需要指定:
- bootstrap.servers,比如:“broker1:9092,broker2:9092”
- bootstrap.servers也就是broker-list,但不需要包含所有的broker,客户端会做一个搜索,搜出所有的broker,一般指定两个,防止其中一个宕机
- group-id,即所属的消费组
- 反序列化器
订阅主题
consumer可以通过列表或正则的方式订阅主题
注意: 同一个Group中的不同Consumer实例可以订阅不同的Topic
即:订阅是消费者的行为,而不是消费组的行为
轮询
kafka是pull类型的消息队列,需要消费者自身定期轮询broker
轮询不只是包含消息拉取,还包含查找群组协调器并加入该组,接收分配的分区,以及后续的再平衡过程,以及心跳的发送
消息提交和偏移量
kafka不会像其他__JMS__队列一样,需要消费者在消费成功后返回ack,这是因为偏移量offset是由消费者自己维护的.
jms指java message service,是一种消息队列api规范
为了保证分区再平衡或消费者宕机后的消息偏移量丢失,消费者仍然需要以一种方式去向消息队列提交偏移量
具体操作是,消费者往一个叫__"_consumer_offset"__的特殊主题上发送消息,消息包含该消费者消费分区的偏移量
自动提交
消费者可以设置自己的提交方式为自动提交,默认下,每过5s,消费者就会把当前偏移量提交,间隔可以由某个参数设置,但是注意,提交操作是在轮询操作中的,而不是真正的定时任务,所以,提交间隔可能大于设置的参数.
自动提交一般以配置文件的方式设置
但显然,提交的offset总是要落后于当前offset的,再平衡后,消息将会被重复处理
手动同步提交
调用consumer.commitSync()
来提交当前poll()返回的最新的offset
手动异步提交
同步提交会阻塞进程,可以使用consumer.commitAsync()
来避免对程序吞吐量的降低
错误处理
我们在poll_loop里面,将会使用异步的提交,但是如果发生错误要退出进程了,将会在退出前使用同步提交同步一次offset
再均衡回调
当消费者订阅一个主题时,允许其传入一个再均衡处理器的回调类 ,该处理器需要实现两个函数
- onPartitionsAssigned
- onPartitionsRevoked
当由于新消费者加入或旧消费者退出导致的再平衡时,该处理器将会被执行.一般的,我们借助这个处理器来提交当前处理的offset
在go中,其实可以定义两个接口,一个实现onPartitionsAssigned,一个实现onPartitionsRevoked,然后传入的时候,参数形式是interface{},内部再断言一下就可以了
可以参考rpcx的插件开发,它会将所有的插件无差别的注册在一起,以interface{}存储,然后使用的时候,我们在一个特定的地方需要调用一个特定的插件,此时便遍历所有插件断言能否转成特定的插件
流指针改变
我们说消息队列是一种伪流,因为我们仍然可以操作流指针,使之回退或前进,相当于文件流指针的用法,通过seek方法改变当前读取位置
优雅退出
我们可能希望能在其他线程关闭所有的或一些消费者,在go语言里这是简单的,只需要一个经典的for{select{}}即可
对于信号,我们也要捕捉,因为在关闭前,我们需要做一些收尾工作,比如首先停止poll,但是对正在处理的消息,我们仍然要等它处理完成,然后提交offset.最后退出
独立消费者
有时候,我们可能不想让消费者加入某个消费组,而只是让他独立存在,这时候我们可以手动分配分区
前面我们订阅分区是通过consumer.subscribe("topic" , rebalanceHandler)
,消费组的id是在创建consumer时指定的.
手动分配分区时,首先去查询某个topic的所有分区列表partitionInfos = consumer.partitionsFor("topic")
,然后遍历列表,组合成assign api要求的数据形式,最后调用assign.
注意consumer.assign(partitions)
中,这个partitions是类似于[]TopicPartition一样的数组,TopicPartition是{topic,partition}是结构,因此,这是为了能订阅不同主题的不同partition而设计的.