ps:
1、本文示例使用的消息中间件为Rabbitmq
。
2、示例代码是以测试用例的形式给出。
3、使用@ActiveProfiles( active_profile(s) )
让指定配置生效。
背景
前文 Spring Cloud Stream 进阶配置——高吞吐量(一) 说到,最简单的提高消费端吞吐量的方式为:增加消费者数量,但具体增加到多少是很难确定的,因为各种各样的因素都会导致消费者数量冗余或不足,比如在用户访问高峰期,消费者数量肯定需要比较多,而在平时或凌晨,访问量降到低谷,消费者数量肯定就冗余了,甚至,一个队列只在某个定时任务才会用到,那么在平时多消费者基本就只有占用系统资源的作用了。
最大消费者数量
好在,Spring Cloud Stream
为我们提供了一个配置,用于控制 最大消费者数量,当然还有另一个隐藏的使用功能——动态创建/销毁消费者。配置为:spring.cloud.stream.rabbit.bindings.<channelName>.consumer. maxConcurrency
,比如:spring.cloud.stream.rabbit.bindings.input.consumer. maxConcurrency
。另外,值得注意的是,要区别于配置:spring.cloud.stream.bindings.<channelName>.consumer.concurrency
,可以看到 最大消费者数量 配置的前缀多了 rabbit
,因为该配置是只针对 Rabbitmq
的,其他的中间件如 Kafka
则没有。
ps: 不同的消息中间件,因为不同的架构,配置大多都不一样,所以针对那些中间件特有的属性配置,前缀需要多加一个
binder
类型前缀,即rabbit
、kafka
。
示例
以下代码可在 源码 查看。
配置
spring:
application:
name: scas-data-collection
profiles:
active:
default
cloud:
stream:
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
packetUplinkOutput:
destination: packetUplinkTopic
content-type: application/json
binder: rabbit
packetUplinkInput:
destination: packetUplinkTopic
content-type: application/json
group: ${spring.application.name}
binder: rabbit
consumer:
concurrency: 3 # 初始/最少/空闲时 消费者数量。默认1
rabbit:
bindings:
packetUplinkInput:
consumer:
maxConcurrency: 10 # 默认:1。queue的消费者的最大数量。当前消费者数量不足以及时消费消息时, 会动态增加消费者数量, 直到到达最大数量, 即该配置的值.
代码
消息模型
@Data
@NoArgsConstructor
@AllArgsConstructor
public class PacketModel {
/**
* 设备 eui
*/
private String devEui;
/**
* 数据
*/
private String data;
// 省略其他字段
}
测试用例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@ActiveProfiles("maxConcurrency")
@EnableBinding({ScasMaxConcurrencyTest.MessageSink.class, ScasMaxConcurrencyTest.MessageSource.class})
public class ScasMaxConcurrencyTest {
@Autowired
private PacketUplinkProducer packetUplinkProducer;
private Random random = new Random();
private List<String> devEuis = new ArrayList<>(10);
@PostConstruct
private void initDevEuis() {
devEuis.add("10001");
devEuis.add("10002");
devEuis.add("10003");
devEuis.add("10004");
devEuis.add("10005");
devEuis.add("10006");
devEuis.add("10007");
devEuis.add("10008");
devEuis.add("10009");
devEuis.add("10010");
}
@Test
public void test() throws InterruptedException {
// 启动时先睡眠20s, 方便在控制面板查看 初始消费者数量
Thread.sleep(20000);
for (int i = 0; i < 50000; i++) {
String devEui = getDevEuis();
packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
}
Thread.sleep(10000000);
}
private String getDevEuis() {
return devEuis.get(random.nextInt(10));
}
@Component
public static class PacketUplinkProducer {
@Autowired
private MessageSource messageSource;
public void publish(PacketModel model) {
log.info("发布上行数据包消息. model: [{}].", model);
messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
}
}
@Component
public static class PacketUplinkHandler {
@StreamListener("packetUplinkInput")
public void handle(PacketModel model) throws InterruptedException {
Thread.sleep(20);
log.info("消费上行数据包消息. model: [{}].", model);
}
}
public interface MessageSink {
@Input("packetUplinkInput")
SubscribableChannel packetUplinkInput();
}
public interface MessageSource {
@Output("packetUplinkOutput")
MessageChannel packetUplinkOutput();
}
}
运行测试用例
运行测试用例后,访问 Rabbitmq可视化页面 可以看到类似下图的页面:
过一段时间后,消费者数量 则动态增加到6,如下图:
在过段时间会增加到30,即增加到消费者数量的最大值,如下图:
当堆积的消息被消费完,消费者处于空闲状态,于是会逐渐销毁空闲消费者,直到剩余消费者数量等于初始值(3):
再次发布消息
因为程序是以测试用例的形式启动的,所以要想不破坏消费者数量,那么需要单独再增加一个测试用例,该测试用例单纯只有消息发布功能。如下:
配置
spring:
application:
name: scas-data-collection
profiles:
active:
default
cloud:
stream:
binders:
rabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
packetUplinkOutput:
destination: packetUplinkTopic
content-type: application/json
binder: rabbit
测试用例
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
@EnableBinding({ScasOnlyProducerTest.MessageSource.class})
@ActiveProfiles("onlyProducer")
public class ScasOnlyProducerTest {
@Autowired
private PacketUplinkProducer packetUplinkProducer;
private Random random = new Random();
private List<String> devEuis = new ArrayList<>(10);
@PostConstruct
private void initDevEuis() {
devEuis.add("10001");
devEuis.add("10002");
devEuis.add("10003");
devEuis.add("10004");
devEuis.add("10005");
devEuis.add("10006");
devEuis.add("10007");
devEuis.add("10008");
devEuis.add("10009");
devEuis.add("10010");
}
/**
* 使用该测试用例, 之前降下来的消费者数量, 最后又会升上去
* @throws InterruptedException
*/
@Test
public void test1() throws InterruptedException {
while (true) {
int msgCount = 10;
for (int i = 0; i < msgCount; i++) {
String devEui = getDevEuis();
packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
}
Thread.sleep(1000);
}
}
/**
* 使用该测试用例, 消费者数量基本保持不变
* @throws InterruptedException
*/
@Test
public void test2() throws InterruptedException {
while (true) {
int msgCount = 3;
for (int i = 0; i < msgCount; i++) {
String devEui = getDevEuis();
packetUplinkProducer.publish(new PacketModel(devEui, UUID.randomUUID().toString()));
}
Thread.sleep(3000);
}
}
private String getDevEuis() {
return devEuis.get(random.nextInt(10));
}
@Component
public static class PacketUplinkProducer {
@Autowired
private MessageSource messageSource;
public void publish(PacketModel model) {
log.info("发布上行数据包消息. model: [{}].", model);
messageSource.packetUplinkOutput().send(MessageBuilder.withPayload(model).build());
}
}
public interface MessageSource {
@Output("packetUplinkOutput")
MessageChannel packetUplinkOutput();
}
}
注意,启动下文的测试用例时,之前的测试用例不能关闭,因为要验证动态增加/销毁消费者。
启动测试用例一
使用该测试用例, 之前降下来的消费者数量, 最后又会升上去。
启动测试用例二
先关闭上一个测试用例
通过上面2个测试用例的结果,可以推测:在某个时刻,当 消息发布速率 > 消息消费速率 时,如果当前 消费者数量 < 最大消费者数量,那么 Spring Cloud Stream
会帮我们创建新的消费者,直到 消费者数量 = 最大消费者数量。
结论
配置 spring.cloud.stream.rabbit.bindings.<channelName>.consumer. maxConcurrency
,可以让应用拥有动态增加/销毁消费者的能力,达到动态控制消费者数量的目的。再配合 spring.cloud.stream.bindings.<channelName>.consumer.concurrency
,可以将消费者数量控制在一定范围内。
另外,需要注意的是,maxConcurrency
并不是越大越好,因为该值越大,若一直有大量消息发布,那么 Spring Cloud Stream
则会一直增加消费者,知道消费者数量等于 maxConcurrency
,而一个消费者对应了一个线程,线程太多的话,会加载系统的负担,从而可能影响到其他业务。
但是如果不增加消费者数量的话,而消息发布的吞吐量又居高不下,该怎么办呢?实际上,增加消费者数量,其最终目的就是提升消费端的消费力,那我们可以从别的方式入手,比如增加应用实例数量,其实这也是另一种增加消费者数量的方式,只是在其他机器而已;而如果暂时没有增加机器的预算,那么可以考虑另一种方案:Spring Cloud Stream 进阶配置——高吞吐量(三)
推荐阅读
Spring Cloud Stream 进阶配置——高吞吐量(一)——多消费者
Spring Cloud Stream 进阶配置——高吞吐量(三)——批量预取消息(prefetch)