Welcome

首页 / 软件开发 / JAVA / SpringBoot+RabbitMq 实现延时发送消息-死信方式

SpringBoot+RabbitMq 实现延时发送消息-死信方式

由于考虑到RabbitMq支持高并发、持久化、高可用等优点,决定从activeMq转战RabbitMq,因为前期调研不充分,结果发现RabbitMq自身竟然不支持延时消息,只能通过死信或者集成插件来实现,这里先介绍死信的方式。


这里就不赘述Rabbit的详细用法了,就直接上代码介绍实现步骤啦。


一.定义消息队列的常量


public static final String ACT_QUEUE= "act_queue";//活动立即消费队列

public static final String ACT_DELAY_QUEUE= "act_delay_queue";//活动死信消费队列

public static final String ACT_EXCHANGE = "act_exchage";         //交换器

public static final String ACT_DEAD_LETTER_EXCHANGE = "act_dead_letter_exchange"; //死信交换器

public static final String ACT_ROUTING_KEY = "act_routing_key";//立即消费路由键

public static final String ACT_DELAY_ROUTING_KEY = "act_delay_routing_key"; //延迟路由键

二.配置队列


-------------------认真看,这可是完整代码


//立即消费队列

@Bean

public Queue actQueue() {

    return new Queue(ACT_QUEUE,true);

}

 

/**

 * 延迟队列: 创建一个延迟队列, 此队列中的消息没有消费者去消费, 到了过期时间之后变成死信, 变死信之后会根据

 *           绑定的DLX和routingKey重新发送到指定交换机再到指定队列。

*/

@Bean

public Queue delayQueue() {

    Map

    // x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,

    map.put("x-dead-letter-exchange", ACT_EXCHANGE);

    // x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。

    map.put("x-dead-letter-routing-key", ACT_ROUTING_KEY);

    Queue queue = new Queue(ACT_DELAY_QUEUE, true, false, false, map);

    System.out.println("----------------------------deadLetterQueue :" + queue.getArguments());

    return queue;

}

 

/**

 *  声明立即消费队列交换机-direct类型

 * 注:把消息投递到那些binding key与routing key完全匹配的队列中。

 * */

@Bean

public DirectExchange actExchage(){

    // 一共有三种构造方法,可以只传exchange的名字, 第二种,可以传exchange名字,是否支持持久化,是否可以自动删除,

    //第三种在第二种参数上可以增加Map,Map中可以存放自定义exchange中的参数

    return new DirectExchange (ACT_EXCHANGE,true, false);

}

 

//声明死信队列交换机-direct类型

@Bean

public DirectExchange deadLetterExchange() {

    return new DirectExchange(ACT_DEAD_LETTER_EXCHANGE, true, false);

}

 

/**

 * @description   绑定队列与交换机

 *                  把立即消费的队列和立即消费交换机绑定, immediate_exchange, 路由键:immediate_routing_key

 * @author        songchengye

 * @date          2020/10/24 13:24

 */

@Bean

public Binding actBinding() {

    return BindingBuilder.bind(actQueue()).to(actExchage()).with(ACT_ROUTING_KEY);

}

 

/**

 * @description   死信队列的交换机绑定:把延迟消费的队列和死信交换机绑定, immediate_dead_exchange, 路由键:delay_routing_key

 * @author        songchengye

 * @date          2020/10/24 13:24

*/

@Bean

public Binding queueDeadBinding() {

    return BindingBuilder.bind(delayQueue()).to(deadLetterExchange()).with(ACT_DELAY_ROUTING_KEY);

}

三.编写消息发送者


import com.alibaba.fastjson.JSONObject;

import com.jeesite.modules.sp.entity.SpAct;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.AmqpException;

import org.springframework.amqp.core.Message;

import org.springframework.amqp.core.MessagePostProcessor;

import org.springframework.amqp.rabbit.core.RabbitTemplate;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Service;

import java.util.Date;

 

 

@Service

public class MQActSender {

 

private static Logger log = LoggerFactory.getLogger(MQActSender.class);

 

@Autowired

RabbitTemplate rabbitTemplate;

 

//发送延时消息

public void sendDelayCancelAct(Object message){

    log.info("进入活动消息生产者-----------sendDelayCancelAct---------------发送活动延时消息:"+message.toString());

    SpAct spAct = (SpAct) message;

    JSONObject jsonObject = new JSONObject();

    jsonObject.put("test","测试发送消息到死信队列");

    String time = "60000";//延时时间,这样得到的值是毫秒级,rabbitMQ默认是毫秒

    log.info("----sendDelayCancelAct---------------------------活动消息延迟时间:"+time);

    rabbitTemplate.convertAndSend(MQConfig.ACT_DEAD_LETTER_EXCHANGE, MQConfig.ACT_DELAY_ROUTING_KEY, jsonObject,

            new MessagePostProcessor() {

                @Override

                public Message postProcessMessage(Message message) throws AmqpException {

                    //设置消息持久化

                    //message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

                    message.getMessageProperties().setExpiration(time);

                    return message;

                }

            });

}

四.编写消息消费者


import com.alibaba.fastjson.JSONObject;

import com.jeesite.modules.sp.api.constant.Dict;

import com.jeesite.modules.sp.entity.SpAct;

import com.jeesite.modules.sp.entity.SpActEnter;

import com.jeesite.modules.sp.service.SpActEnterService;

import com.jeesite.modules.sp.service.SpActService;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import org.springframework.amqp.rabbit.annotation.RabbitListener;

import org.springframework.beans.factory.annotation.Autowired;

import org.springframework.stereotype.Component;

import org.springframework.stereotype.Service;

import java.util.Date;

 

 

@Service

@Component

public class MQActReceiver {

 

@Autowired

private SpActService spActService;

 

@Autowired

private SpActEnterService spActEnterService;

 

 

private static Logger log = LoggerFactory.getLogger(MQActReceiver.class);

 

@RabbitListener(queues  = MQConfig.ACT_QUEUE)

public void receive(JSONObject message){

    log.info("进入消息消费者----------------------------监听活动消息 = " + message.toJSONString());

    try {

        String test= message.getString("test").toString();

        log.info("---------------------------MQActReceiver--------监听到的消息:"+test

    } catch (Exception e) {

        e.printStackTrace();

    }

 

}

到此,基本就实现了延时发送,但由于队列先进先出的问题,会出现消息阻塞的问题,例如A消息是10分钟,即使B消息是5分钟,那么B消息也只能等A消息消费以后才能被消费。


大致流程如下图:

死信队列是没有自己的消费者的。


20201026175850546.png

五、后记(该方法的缺陷)


RabbitMq TTL 队列过期时间长的消息阻塞后面过期时间短的消息


利用RabbitMq的TTL time-to-live 来实现延时消费。


延时时间设置到每条消息上的。而不是给队列的。


实现方式为消息存活时间为动态用户页面可配置的。


这就导致了一个问题:


先用一条消息的存活时间是1天。后面又进了一条消息存活时间是1小时。


结果一小时到了,发现这条消息并没有被转发到消费延时过期消息的队列。


原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。


它不会检测每一条消息是否过期。而是顺序检测。


如果first in的消息过期时间很长,会导致它阻塞后进的消息。


不仅无法实现真正的过期时间。还会导致,一个大的过期时间的先进的消息,会堆积一堆后进的过期时间短的消息。


当先进的这条消息过期转发时,后面的消息会瞬间都被转发。造成消费延时过期消息的队列瞬间被转发进来大量消息。


造成消费压力。


所以rabbitmq ttl适用于过期时间一致的延时消费。


对于动态的延时可变的情况,还是考虑定时任务实现。