PS:我这里使用的是自定义的RoketMQ进行消息的发送和消费的,原理都差不多,万变不离其宗。
创建配置文件类
-
首先创建RocketMqConfig、RocketMqProducerConfig、RocketMqConsumerConfig类,里面包含RocketMQ所需的所有配置,等到创建Consumer和Producer的时候可以一键配置,文中所有代码块均省略getter\setter方法
public class RocketMqConfig { private String namesrvAddr = "127.0.0.1:9876"; private RocketMqProducerConfig producerConfig = new RocketMqProducerConfig(); private RocketMqConsumerConfig consumerConfig = new RocketMqConsumerConfig(); }
public class RocketMqProducerConfig { private String groupName = "producer"; private String instanceName = "producer_instance"; private String topic = "topic"; }
public class RocketMqConsumerConfig { //组名 private String groupName = "consumer"; //实例名 private String instanceName = "consumer_instance"; // 订阅主题和标签Map private Map<String, String> subscriptions = new HashMap<>(); //设置批量消费,以提升消费吞吐量,默认是1 private int consumeMessageBatchMaxSize = 1; }
创建Consumer和Producer
-
创建RocketMqProducer、RocketMqConsumer类,里面包括构造方法(有参、无参)、start、stop等方法
public class RocketMqProducer implements MqProducer { private DefaultMQProducer producer; private RocketMqConfig config; private boolean isStarted = false; public RocketMqProducer(RocketMqConfig config) { this.config = config; producer = new DefaultMQProducer(config.getProducerConfig().getGroupName()); producer.setInstanceName(config.getProducerConfig().getInstanceName()); producer.setVipChannelEnabled(false); producer.setNamesrvAddr(config.getNamesrvAddr()); } public boolean isStarted() { return isStarted; } public void start() { producer.start(); isStarted = true; } public void stop() { producer.shutdown(); isStarted = false; } public MqSendResult send(String tag, AbstractMessage t) { t.setTopic(config.getProducerConfig().getTopic()); t.setTag(tag); Message msg = new Message(t.getTopic(), t.getTag(), t.getKey(), t.getMessageType().getType(), t.getBody(), true); SendResult sendResult = producer.send(msg); t.setMessageId(sendResult.getMsgId()); // 返回结果 MqSendResult mqSendResult = new MqSendResult(); mqSendResult.setSuccess(true); mqSendResult.setCode(sendResult.getSendStatus().toString()); mqSendResult.setMsgId(sendResult.getMsgId()); return mqSendResult; } }
public class RocketMqConsumer implements MqConsumer { private MqMessageHandler handler = null; private DefaultMQPushConsumer consumer; private RocketMqConfig config; private boolean isStarted = false; public RocketMqConsumer(RocketMqConfig config) { this.config = config; consumer = new DefaultMQPushConsumer(config.getConsumerConfig().getGroupName()); consumer.setInstanceName(config.getConsumerConfig().getInstanceName()); consumer.setVipChannelEnabled(false); consumer.setConsumeMessageBatchMaxSize(config.getConsumerConfig().getConsumeMessageBatchMaxSize()); consumer.setNamesrvAddr(config.getNamesrvAddr()); // 从队列头开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); } @Override public boolean isStarted() { return isStarted; } @Override public void start() { consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext Context) { for (MessageExt msg : msgs) { System.out.println(msg.getbody()); } if (handler != null) { handler.handle(t); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); try { for(Entry<String, String> sub : config.getConsumerConfig().getSubscriptions().entrySet()) { consumer.subscribe(sub.getKey(), sub.getValue()); } consumer.start(); isStarted = true; } catch (Exception ex) { isStarted = false; throw new Exception(MqLibConstants.LIB_MQ_ROCKETMQ, MqLibExceptionEnum.MQ_CLIENT_EXCEPTION, ex); } } @Override public void stop() { for(Entry<String, String> sub : config.getConsumerConfig().getSubscriptions().entrySet()) { consumer.unsubscribe(sub.getKey()); } consumer.shutdown(); isStarted = false; } @Override public void setHandler(MqMessageHandler handler){ this.handler = handler; } }
消息发送与消费
-
创建Producer类,在其中创建RocketMqConfig、RocketMqProducer对象;
public class Producer { public static void main(String[] args) { RocketMqConfig config = new RocketMqConfig(); MqProducerClient producer = new MqProducerClient(config); producer.start();//启动生产者 JsonMessage jsonMessage = new JsonMessage();//自定义消息类 jsonMessage.setData("JsonMessage!"); producer.send("json", jsonMessage); } }
-
创建Consumer类,在其中创建RocketMqConfig、RocketMqConsumer对象;
public class Consumer { public static void main(String[] args) { RocketMqConfig config = new RocketMqConfig(); Map<String,String> map = new HashMap(); map.put(config.getProducerConfig().getTopic(),"json"); config.getConsumerConfig().setSubscriptions(map); MqConsumerClient consumer = new MqConsumerClient(config); consumer.start(); } }
启动两个类,同时别忘了启动RocketMQ的服务??梢栽赗ocketMQ可视化界面看到生产者、消费者以及消息等信息