图文并茂讲解RocketMQ消息类别
1、同步消息
即时性较强,重要的消息,且必须有回执的消息,例如短信,通知(转账成功)
生产者:
public class Producer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.23.127:9876"); producer.start(); for (int i = 1; i <= 5; i++) { Message msg = new Message("topic2",("同步消息:hello rocketmq "+i).getBytes("UTF-8")); //同步消息发送 SendResult result = producer.send(msg); System.out.println("返回结果:"+result); } producer.shutdown(); } }
消费者:
public class Consumer { public static void main(String[] args) throws Exception{ DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.23.127:9876"); consumer.subscribe("topic2","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者 } }); consumer.start();// 开启多线程 监控消息,持续运行 System.out.println("接收消息服务已运行"); } }
测试:
2、异步消息
即时性较弱,但需要有回执的消息,例如订单中的某些信息
生产者:
public class Producer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.23.127:9876"); producer.start(); for (int i = 1; i <= 5; i++) { //异步消息发送 Message msg = new Message("topic2",("异步消息:hello rocketmq "+i).getBytes("UTF-8")); producer.send(msg, new SendCallback() { //表示成功返回结果 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } //表示发送消息失败 @Override public void onException(Throwable throwable) { System.out.println(throwable); } }); } //添加一个休眠操作,确保异步消息返回后能够输出 TimeUnit.SECONDS.sleep(10); producer.shutdown(); } }
消费者:
public class Consumer { public static void main(String[] args) throws Exception{ DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("group1"); consumer.setNamesrvAddr("192.168.23.127:9876"); consumer.subscribe("topic2","*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { for (MessageExt msg : list) { //System.out.println("收到消息:"+msg); System.out.println("消息:"+new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功处理, mq 收到这个 标记后相同的消息讲不会再次发给消费者 } }); consumer.start();// 开启多线程 监控消息,持续运行 System.out.println("接收消息服务已运行"); } }
测试:
3、单向消息
不需要有回执的消息,例如日志类消息
生产者:
public class Producer { public static void main(String[] args) throws Exception{ DefaultMQProducer producer=new DefaultMQProducer("group1"); producer.setNamesrvAddr("192.168.23.127:9876"); producer.start(); for (int i = 1; i <= 5; i++) { //单向消息 Message msg = new Message("topic2",("单向消息:hello rocketmq "+i).getBytes("UTF-8")); producer.sendOneway(msg); } //添加一个休眠操作,确保异步消息返回后能够输出 TimeUnit.SECONDS.sleep(10); producer.shutdown(); } }
消费者代码同上
测试:
总结 同步消息
SendResult result = producer.send(msg);
异步消息(回调处理结果必须在生产者进程结束前执行,否则回调无法正确执行)
producer.send(msg, new SendCallback() { //表示成功返回结果 @Override public void onSuccess(SendResult sendResult) { System.out.println(sendResult); } //表示发送消息失败 @Override public void onException(Throwable throwable) { System.out.println(throwable); } });
单向消息
producer.sendOneway(msg);
到此这篇关于图文并茂讲解RocketMQ消息类别的文章就介绍到这了,更多相关RocketMQ消息类别内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!
相关文章
javax.mail.SendFailedException: Sending failed问题原因
这篇文章主要介绍了javax.mail.SendFailedException: Sending failed问题原因,需要的朋友可以参考下2015-05-05IntelliJ IDEA 2023.2最新版激活方法及验证ja-netfilter配置是否成功
随着2023.2版本的发布,用户们渴望了解如何激活这个最新版的IDE,本文将介绍三种可行的激活方案,包括许可证服务器、许可证代码和idea vmoptions配置,帮助读者成功激活并充分利用IDEA的功能,感兴趣的朋友参考下吧2023-08-08springboot中RestTemplate发送HTTP请求的实现示例
RestTemplate是一个 spring-web 提供的执行HTTP请求的同步阻塞式工具类,本文就来介绍一下RestTemplate发送HTTP请求,具有一定的参考价值,感兴趣的可以了解一下2024-03-03IDEA安装部署Alibaba Cloud Toolkit的实现步骤
Alibaba Cloud Toolkit是阿里云针对IDE平台为开发者提供的一款插件,本文主要介绍了IDEA安装部署Alibaba Cloud Toolkit的实现步骤,具有一定的参考价值,感兴趣的可以了解一下2023-08-08Java中的HttpServletRequestWrapper用法解析
这篇文章主要介绍了Java中的HttpServletRequestWrapper用法解析,HttpServletRequest 对参数值的获取实际调的是org.apache.catalina.connector.Request,没有提供对应的set方法修改属性,所以不能对前端传来的参数进行修改,需要的朋友可以参考下2024-01-01
最新评论