sarama是kafka go语言的的一个框架,star数也挺高的,这里记录下使用遇到的一些问题。
跟着官方给的例子学习,文件consumer_group_test.go给出了如何使用消费组的例子。文件里有以下一段代码:
// Iterate over consumer sessions.
ctx := context.Background()
for {
topics := []string{"my-topic"}
handler := exampleConsumerGroupHandler{}
err := group.Consume(ctx, topics, handler)
if err != nil {
panic(err)
}
}
问题: 消费组不生效
for循环引发的问题
经过测试代码的for循环测试发现去掉也可以正常消费,于是觉得没有加上for循环。等去掉for循环后之后,诡异的问题出现了。消费组不生效了,比如一个消费组内部起了3个客户端,发现只有一个客户端生效会消费消息,很诡异。后面经过问题排查发现这个for循环不能够去掉,把for循环加上消费组才又生效了。
经过测试发现,rebalance,断网重连都会重新触发for循环代码又运行一次,也意味着 group.Consume(ctx, topics, handler)这个方法在没有遇到上面两种情况的时候是阻塞的,消息消费正常,所以去掉for循环也没关系。一旦遇到上面两种情况,方法就会返回,此时需要重新进入才可以正常消费消息。
参数sarama.OffsetNewest引发的问题
config.Consumer.Offsets.Initial = sarama.OffsetNewest
``
上面这个参数是控制消费组初始消费的位置,默认是OffsetNewest,即从最新的位置开始消费。如果把参数改为如下:
config.Consumer.Offsets.Initial = sarama.OffsetOldest
``
即从最早的位置开始消费?;岱⑾忠不嵊龅较炎槟谥挥幸桓鱿芽突Ф嗽谙严?,其他消费组内的客户端不消费消息。诶,这个问题很诡异。。由于我们的业务只需要从最新位置开始消费消息即可,所以就不去深究原因了。。
参数sarama.BalanceStrategyRange引发的问题
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRange
上面这个参数是控制rebalance时的分区策略,默认是按照范围来分区,即BalanceStrategyRange。
此时如果把配置更改为如下:
config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
上面是采用轮询的方式来分区。如果改成改参数,会报一下异常:
The provider group protocol type is incompatible with the other members.
这是因为无法正确分区导致的问题,此时把分区策略改为 BalanceStrategyRange即可,就不会再有上面这个错误出现。