本文最后更新于 2024-06-14,文章内容可能已经过时。

 // RocketMQ的延迟消息。
 // rocketMQTemplate.syncSend(主题, 消息, 超时时间, 延迟等级);

生产端——消息发送

@Service
public class Server {

    private RocketMQTemplate rocketMQTemplate;
    
	public Server(RocketMQTemplate rocketMQTemplate) {
        this.rocketMQTemplate = rocketMQTemplate;
    }

   /**
   *发送延迟订单消息
   **/
    public void sendDelayOrderMsg(String msg) {
    	Message<String> msgBody = MessageBuilder.withPayload(msg).build();
        rocketMQTemplate.syncSend("delay_order", msgBody , 2000, 16);
    }
}

消费端——延迟消息监听

@Component
@RocketMQMessageListener(topic = "delay_order",consumerGroup = "order-consumer", selectorExpression = "*")
public class OrderListner implements RocketMQListener<String> {

    @Override
    public void onMessage(String msg) {
        System.out.println("接收到的延迟消息:"+msg);
        //todo 自定义义务处理。
        //逻辑删除该订单数据
    }
}

保证消息消费成功

rocketmq-spring-boot-starter在监听消息就实现了自动提交ack

RocketMQListener的onMessage方法不抛异常都会自动提交ack。

抛出异常则进行重试。