1.介绍
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
2.撸代码
- 配置队列,绑定交换机
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue aMessage() {
return new Queue("q_fanout_A");
}
@Bean
public Queue bMessage() {
return new Queue("q_fanout_B");
}
@Bean
public Queue cMessage() {
return new Queue("q_fanout_C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("mybootfanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(aMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(bMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(cMessage).to(fanoutExchange);
}
}
- 生产者
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MsgSenderFanout {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
}
}
创建3高消费者
- 消费者1
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_fanout_A")
public class ReceiverA {
@RabbitHandler
public void process(String hello) {
System.out.println("AReceiver : " + hello + "/n");
}
}
- 消费者2
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_fanout_B")
public class ReceiverB {
@RabbitHandler
public void process(String hello) {
System.out.println("BReceiver : " + hello + "/n");
}
}
- 消费者3
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_fanout_C")
public class ReceiverC {
@RabbitHandler
public void process(String hello) {
System.out.println("CReceiver : " + hello + "/n");
}
}
- 测试
package com.zpc.rabbitmq.fanout;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitFanoutTest {
@Autowired
private MsgSenderFanout msgSender;
@Test
public void send1() throws Exception {
msgSender.send();
}
}
结果如下,三个消费者都收到消息:
AReceiver : hi, fanout msg
CReceiver : hi, fanout msg
BReceiver : hi, fanout msg