06. KafkaConsumer是如何处理过期offset(越界offset)请求
该篇文章是基于以下的条件编写完成的
1. Kafka cluster集群版本0.10.0
2. 提前创建好了一个单partition,单副本的topic:kafka.log.retention
3. Kafka log清理策略配置log.retention.minutes=10,该配置的作用是清理掉当前时间减去日志段中最新时间的差值,如果大于10分钟,就将删除过期的日志段
接下来我们基于以下的步骤演示:
1. 编写程序向topic:kafka.log.retention中发送10条数据
输出结果
我们可以看到发送数据后最新时间是21:45,大概将在21:55的时候生成新的日志段文件
2. 我们消费其中的两条数据,并且commit offset:1
以下是KafkaConsumer的相关参数配置
消费的两条数据打印格式为:offset:key:value
通过以上的程序我们看到,并且同步提交了offset到kafka server端
3. 我们等10分钟时间,日志段将被清理
我们看到准时在21:55生成了新的日志段
4. 接下来我们再使用上面的消费程序接着消费数据
由于我们之前消费到了offset=1的地方,接下来要开始消费offset=2的数据,其实我们从第三步可以看到offset从2到9的数据已经被删除了,接下来我们看kafka是如何处理的
我们看到ApiKeys.FETCH请求offset=1,服务器会从>1的offset返回数据也就是offset=2
请求的响应如下图:
我们看到服务器返回了Errors.OFFSET_OUT_OF_RANGE(offset越界)的错误,由于我们设置了默认的offset重置策略,接下来会根据重置策略重新设置消费组获取数据的起始offset
会通过该方法重新获取起始offset
由于我们设置的策略是properties.put("auto.offset.reset", "earliest");所以从最早的offset开始,目前最早的offset=10
5. 我们将重置策略设置成:none
properties.put("auto.offset.reset", " none")
接着执行上面第4步的消费操作,kafka的client会怎样处理呢?
我们看到传递参数中的offset还是1
我们看到此时的fetch响应走到了else的分支
会在接下来走到该代码
会在该方法中抛出异常OffsetOutOfRangeException
堆栈异常
总结
通过我们上面的试验,我们发现不同的offset重置策略对kafka-client对过期请求的响应方式是不同的
Offse重置策略一共有三种:"latest", "earliest", "none"
"latest", "earliest"不影响程序的正常运行,会打印相关的offset越界日志
"none"会直接抛出OffsetOutOfRangeException异常