RabbitMQ的死信队列

/ java / 没有评论 / 153浏览

RabbitMQ的死信队列

死信队列介绍

消息变成死信有以下几种情况

死信的处理方式

死信的产生既然不可避免,那么就需要从实际的业务角度和场景出发,对这些死信进行后续的处理,常见的处理方式大致有下面几种

  1. 丢弃,如果不是很重要,可以选择丢弃
  2. 记录死信入库,然后做后续的业务分析或处理
  3. 通过死信队列,由负责监听死信的应用程序进行处理

综合来看,更常用的做法是第三种,即通过死信队列,将产生的死信通过程序的配置路由到指定的死信队列,然后应用监听死信队列,对接收到的死信做后续的处理 alt

死信队列演示

消息超时进入死信队列

这是一种在实际生产中应用场景比较多的一种方式,比如我们熟知的订单业务场景,当用户购买商品产生了一个订单的时候,可以设置过期时间,如果在这段时间内,消息还没有被消费,将会被路由到死信队列,专业术语来讲,即消息的TTL,TTL过期了消息将进入死信队列,下面是一段演示代码,这里包括两部分,生产者和消费者

rabbitmq的死信队列设置主要在参数argument中做配置,这里需要设置的有 x-dead-letter-exchange 和 x-message-ttl

producer代码 此处模拟生产者产生订单,推送到队列中,消息有效时间是10S,过了10S如果没有被消费将会被路由到死信队列

 public class Producer {

    public static void main(String[] args) throws Exception{

        final Channel channel = RabbitUtil.getChannel();
        String orderExchangeName = "order_exchange";
        String orderQueueName = "order_queue";
        String orderRoutingKey = "order.#";
        Map<String, Object> arguments = new HashMap<String, Object>(16);

        //死信队列配置  ----------------
        String dlxExchangeName = "dlx.exchange";
        String dlxQueueName = "dlx.queue";
        String dlxRoutingKey = "#";

        // 为队列设置队列交换器
        arguments.put("x-dead-letter-exchange",dlxExchangeName);
        // 设置队列中的消息 10s 钟后过期
        arguments.put("x-message-ttl", 10000);

        //正常的队列绑定
        channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);
        channel.queueDeclare(orderQueueName, true, false, false, arguments);
        channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);

        String message = new SimpleDateFormat("yyyy-MM-dd HH🇲🇲ss").format(new Date()) + " 创建订单.";

        // 创建死信交换器和队列
        channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
        channel.queueDeclare(dlxQueueName, true, false, false, null);
        channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey);

        channel.basicPublish(orderExchangeName, "order.save", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

        System.err.println("消息发送完成......");
    }

}

consumer代码 消费端监听的是死信队列,如果conusmer收到了消息,表明死信队列里面有消息了

public class Consumer {

    private static final String QUEUE_NAME = "dlx.queue";

    public static void main(String[] args) throws Exception{
        // 创建信道
        final Channel channel = RabbitUtil.getChannel();
        // 消费端消息限流。
        // 设置客户端最多接收未被ack的消息个数, 只有消息 手动签收  此参数才会生效。
        channel.basicQos(64);

        System.out.println("消费者启动 ..........");

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.err.println("死信队列接收到消息:" + new String(body));
                System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        };

        channel.basicConsume(QUEUE_NAME, consumer);
        TimeUnit.SECONDS.sleep(10000000L);
    }

}

启动producer 消息发送成功,同时可以通过控制台看到,exhange和相关的队列也帮我们创建了,要注意的是在dlx.queue中,有一个消息就绪,很明显,消息过了10S中没有任何消费者消费,就被路由到了死信队列dlx.queue中 image.png RabbitMQ management image.png auoQEV.png

启动consumer 通过控制台打印结果,可以看到,由于消费端监听的是死信队列,已经从dlx.queue中成功获取到了这条信息 image.png

消息被拒绝,且requeue=false

就是在consumer端,当消费者要过滤某些消息的时候,那部分被过滤掉的消息如果不设置退回,这些消息就变成了死信,即在下面的代码中第三个参数设置成false即可

channel.basicNack(envelope.getDeliveryTag(),false,false);

peoducer端代码

/**
 * 生产者
 * 死信队列使用
 */
public class Producer {

    public static void main(String[] args) throws Exception{

        Channel channel = RabbitUtil.getChannel();
        String exchangeName = "test_ack_exchange";
        String routingKey = "ack.save";

        //通过在properties设置来标识消息的相关属性
        for(int i=0;i<5;i++){
            Map<String, Object> headers = new HashMap<String, Object>();
            headers.put("num",i);
            AMQP.BasicProperties properties = new AMQP.BasicProperties().builder()
                    .deliveryMode(2)                    // 传送方式 2:持久化投递
                    .contentEncoding("UTF-8")           // 编码方式
                    //.expiration("10000")              // 过期时间
                    .headers(headers)                  //自定义属性
                    .build();
            String message = "hello this is ack message ....."  + i;
            System.out.println(message);
            channel.basicPublish(exchangeName,routingKey,true,properties,message.getBytes());
        }

    }


}

consumer端代码

public class Consumer {

    public static void main(String[] args) throws Exception{

        final Channel channel = RabbitUtil.getChannel();
        String exchangeName = "test_ack_exchange";
        String exchangeType="topic";
        final String queueName = "test_ack_queue";
        String routingKey = "ack.#";

        //死信队列配置  ----------------
        String deadExchangeName = "dead_exchange";
        String deadQueueName = "dead_queue";
        String deadRoutingKey = "#";
        //死信队列配置  ----------------

        //如果需要将死信消息路由
        Map<String,Object> arguments = new HashMap<String, Object>();
        arguments.put("x-dead-letter-exchange",deadExchangeName);

        channel.exchangeDeclare(exchangeName,exchangeType,true,false,false,null);
        channel.queueDeclare(queueName,false,false,false,arguments);
        channel.queueBind(queueName,exchangeName,routingKey);

        //死信队列绑定配置  ----------------
        channel.exchangeDeclare(deadExchangeName,exchangeType,true,false,false,null);
        channel.queueDeclare(deadQueueName,true,false,false,null);
        channel.queueBind(deadQueueName,deadExchangeName,deadRoutingKey);
        //死信队列配置  ----------------

        System.out.println("consumer启动 .....");

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try{
                    Thread.sleep(2000);
                }catch (Exception e){

                }
                Integer num = (Integer)properties.getHeaders().get("num");
                if(num==0){
                    //未被ack的消息,并且requeue=false。即nack的 消息不再被退回队列而成为死信队列
                    channel.basicNack(envelope.getDeliveryTag(),false,false);
                    String message = new String(body, "UTF-8");
                    System.out.println("consumer端的Nack消息是: " + message);
                }else {
                    channel.basicAck(envelope.getDeliveryTag(),false);
                    String message = new String(body, "UTF-8");
                    System.out.println("consumer端的ack消息是: " + message);
                }
            }
        };
        //消息要能重回队列,需要设置autoAck的属性为false,即在回调函数中进行手动签收
        channel.basicConsume(queueName,false,consumer);
    }
}

要关注的即下面的这处代码和第三个参数 image.png

启动消费者 image.png

启动生产者 生产者成功发送5条消息 image.png

再看消费端的控制台,这里num=0的这条消息由于设置了死信队列而不会重回原来的队列,在上一篇中,当参数设置成了true的时候,看到控制台一直会打印一条消息 image.png

同时,通过控制台也可以发现,在dead_queue中,有一条消息为就绪状态了,即死信消息,但这里并没有对这条消息做处理,目前一直存在队列里面,可以根据实际应用做后续的处理 image.png 我们可以启动dead_queue的死信队列的消费者,就接受到了 image.png

队列达到最大长度

我们设置某个队列的最大可承载消息的数量是100个,超出第100个的消息将会被路由到死信队列中,设置消息队列的最大数量也是实际生产中作为队列限流的一种常规手段,具有实际的业务意义,下面是代码演示,基本设置和上述的TTL类似,只是在参数中将TTL更换为如下配置

arguments.put("x-max-length",3);

生产者代码 这里我们设定order_queue这个队列的容量是5个,但是我们在程序中设置的x-max-length=3,那么按照这个猜想,将会有两个消息被路由到死信队列

public class Producer {

    public static void main(String[] args) throws Exception{

        final Channel channel = RabbitUtil.getChannel();
        String orderExchangeName = "order_exchange";
        String orderQueueName = "order_queue";
        String orderRoutingKey = "order.#";
        Map<String, Object> arguments = new HashMap<String, Object>(16);

        //死信队列配置  ----------------
        String dlxExchangeName = "dlx.exchange";
        String dlxQueueName = "dlx.queue";
        String dlxRoutingKey = "#";

        // 为队列设置队列交换器
        arguments.put("x-dead-letter-exchange",dlxExchangeName);
        // 设置队列中的消息 10s 钟后过期
        //arguments.put("x-message-ttl", 10000);
        arguments.put("x-max-length",3);

        //正常的队列绑定
        channel.exchangeDeclare(orderExchangeName, "topic", true, false, null);
        channel.queueDeclare(orderQueueName, true, false, false, arguments);
        channel.queueBind(orderQueueName, orderExchangeName, orderRoutingKey);

        String message = new SimpleDateFormat("yyyy-MM-dd HH🇲🇲ss").format(new Date()) + " 创建订单.";

        // 创建死信交换器和队列
        channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
        channel.queueDeclare(dlxQueueName, true, false, false, null);
        channel.queueBind(dlxQueueName, dlxExchangeName, orderRoutingKey);

        for(int i=0;i<5;i++){
            message = message + "========> " + i ;
            System.out.println("发送的消息是:" + message);
            channel.basicPublish(orderExchangeName, "order.save",null, message.getBytes());
        }

        System.err.println("消息发送完成......");
    }

}

消费者代码

public class Consumer {

    private static final String QUEUE_NAME = "order_queue";

    public static void main(String[] args) throws Exception{
        // 创建信道
        final Channel channel = RabbitUtil.getChannel();
        // 消费端消息限流。
        // 设置客户端最多接收未被ack的消息个数, 只有消息 手动签收  此参数才会生效。
        //channel.basicQos(1);

        System.out.println("消费者启动 ..........");

        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.err.println("死信队列接收到消息:" + new String(body));
                System.err.println("deliveryTag:" + envelope.getDeliveryTag());
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };

        channel.basicConsume(QUEUE_NAME,false, consumer);
        //TimeUnit.SECONDS.sleep(10000000L);
    }

}

启动生产者 5条消息发送完毕 image.png

启动消费端 通过控制台可以看到,消费端只从order_queue中消费了3条消息,还剩2条消息去哪里了呢? image.png

我们再回到控制台观察一下,发现在dlx.queue这个死信队列中有两条就绪的消息,即剩下的2条消息被路由到了死信队列了 image.png 我们可以启动死信队列的消费者,就接受到了 image.png

demo

http://git.wingsky.net/kite/rabbitmq-dlx springboot版本:http://git.wingsky.net/kite/rabbitmq-dlx-springboot