基础概念

1.Queue

队列,用于储存消息,先入先出,prefetchCount限制平分给消费者的消息个数。

2.Exchange

交换机,生产者生产的消息先经过交换机,再路由到一个或多个Queue,这个过程通过binding key完成。

Exchange交换类别

  • fanout:会把所有发到Exchange的消息路由到所有和它绑定的Queue

  • direct:会把消息路由到routing key和binding key完全相同的Queue,不相同的丢弃

  • topic:direct是严格匹配,那么topic就算模糊匹配,routing key和binding key都用.来区分单词串,比如A.B.C,匹配任意单词,#匹配任意多个或0个单词,比如。B.*可以匹配到A.B.C

  • headers:不依赖routing key和binding key,通过对比消息属性中的headers属性,对比Exchange和Queue绑定时指定的键值对,相同就路由过来

3.实现思路

RabbitMQ队列本身是没有直接实现支持延迟队列的功能,但可以通过它的Time-To-Live Extensions 与 Dead Letter Exchange 的特性模拟出延迟队列的功能。

4.能干什么

订单业务: 在电商/点餐中,都有下单后 30 分钟内没有付款,就自动取消订单。

短信通知: 下单成功后 60s 之后给用户发送短信通知。

失败重试: 业务操作失败后,间隔一定的时间进行失败重试。

5.ACK

/消息的标识,false只确认当前一个消息收到,true确认所有consumer获得的消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
//ack返回false,并重新回到队列,api里面解释得很清楚
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
//拒绝消息
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

6.消息加入队列的回调

RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMandatory(true);
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> System.out.println(“消息发送成功回调!”));
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> System.out.println(“消息发送失败回调!”));

延迟队列方法一(使用默认RabbitMQ)

1.导入Maven

pom.xml

//加多如下依赖,当然web环境的依赖也要有
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.46</version>
</dependency>

2.配置属性

application-dev.properties

//这里是dev环境的配置,不是主配置文件!!! 主配置文件引用一下这个就好。
server.port=8080

spring.rabbitmq.username=liuliu
spring.rabbitmq.password=123456
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=liuliu
# 手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3.定义队列

RabbitConfig.java

package com.benzhu.xyz.config;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    /**
     *
     * 配置RabbitMQ
     */
    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
        return rabbitTemplate;
    }

    /**
     * 延迟队列的名称
     */
    private static final String REGISTER_DELAY_QUEUE = "demo3";
    /**
     * DLX,dead letter发送到的 exchange
     * TODO 此处的 exchange 很重要,具体消息就是发送到该交换机的
     */
    public static final String REGISTER_DELAY_EXCHANGE = "exchange";
    /**
     * routing key 名称
     * TODO 此处的 routingKey 很重要,具体消息发送在该 routingKey 的
     */
    public static final String DELAY_ROUTING_KEY = "";

    /**
     * 底下就是配置实际消费的队列
     */
    public static final String REGISTER_QUEUE_NAME = "demo4";
    public static final String REGISTER_EXCHANGE_NAME = "demo4.exchange";
    public static final String ROUTING_KEY = "all";

    /**
     * 延迟队列配置
     * 1、params.put("x-message-ttl", 5 * 1000);
     * TODO 第一种方式是直接设置 Queue 延迟时间 但如果直接给队列设置过期时间,这种做法不是很灵活,(当然二者是兼容的,默认是时间小的优先)
     * 2、rabbitTemplate.convertAndSend(book, message -> {
     * message.getMessageProperties().setExpiration(2 * 1000 + "");
     * return message;
     * });
     * TODO 第二种就是每次发送消息动态设置延迟时间,这样我们可以灵活控制
     **/
    @Bean
    public Queue delayProcessQueue() {
        Map<String, Object> params = new HashMap<>();
// x-dead-letter-exchange 声明了队列里的死信转发到的交换机名称,
        params.put("x-dead-letter-exchange", REGISTER_EXCHANGE_NAME);
// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。
        params.put("x-dead-letter-routing-key", ROUTING_KEY);
        params.put("x-delayed-type", "direct");
        return new Queue(REGISTER_DELAY_QUEUE, true, false, false, params);
    }

    /**
     * 需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
     * 这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。
     * TODO 创建一个交换机
     */
    @Bean
    public DirectExchange delayExchange() {
        return new DirectExchange(REGISTER_DELAY_EXCHANGE);
    }

    /**
     *
     * TODO 将交换机和延迟队列绑定,需要指定routing key
     */
    @Bean
    public Binding dlxBinding() {
        return BindingBuilder.bind(delayProcessQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY);
    }

    /**
     * TODO 创建实际消费的队列
     */
    @Bean
    public Queue registerBookQueue() {
        return new Queue(REGISTER_QUEUE_NAME, true);
    }

    /**
     * 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
     * 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
     **/
    @Bean
    public TopicExchange registerBookTopicExchange() {
//死信转发到的交换机与模式匹配
        return new TopicExchange(REGISTER_EXCHANGE_NAME);
    }

    @Bean
    public Binding registerBookBinding() {
// TODO 如果要让实际消费的队列和交换机之间有关联,这里的 routingKey 和 绑定的交换机很关键
        return BindingBuilder.bind(registerBookQueue()).to(registerBookTopicExchange()).with(ROUTING_KEY);
    }
}

4.定义一个实体类

Book.java

package com.benzhu.xyz.bean;

public class Book implements java.io.Serializable{
    //注意要序列化
    private static final long serialVersionUID = -4752635028290410893L;
    private String id;
    private String name;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Book(String id, String name) {
        super();
        this.id = id;
        this.name = name;
    }
    public Book() {
        super();
    }
    @Override
    public String toString() {
        return "Book [id=" + id + ", name=" + name + "]";
    }
}

5.写一个控制器

BookController.java

package com.benzhu.xyz.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.benzhu.xyz.bean.Book;
import com.benzhu.xyz.config.RabbitConfig;

@RestController
@RequestMapping(value = "/books")
public class BookController {
    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public BookController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    @GetMapping
    public void defaultMessage() {
        Book book = new Book();
        book.setId("1");
        book.setName("一起来学Spring Boot");
        this.rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> System.out.println("消息发送成功回调!"));
        this.rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> System.out.println("消息发送失败回调!"));
        this.rabbitTemplate.convertAndSend(RabbitConfig.REGISTER_DELAY_EXCHANGE, RabbitConfig.DELAY_ROUTING_KEY, book, message -> {
// TODO 第一句是可要可不要,根据自己需要自行处理

// TODO 如果配置了 params.put("x-message-ttl", 5 * 1000); 那么这一句也可以省略,具体根据业务需要是声明 Queue 的时候就指定好延迟时间还是在发送自己控制时间
            message.getMessageProperties().setExpiration(10 * 1000 + "");
            return message;
        });
        book.setName("学个鬼Spring Boot");
        this.rabbitTemplate.convertAndSend(RabbitConfig.REGISTER_DELAY_EXCHANGE, RabbitConfig.DELAY_ROUTING_KEY, book, message -> {
            message.getMessageProperties().setExpiration(1 * 1000 + "");
            return message;
        });
    }

6.创建消费者

BookHandler.java

package com.benzhu.xyz.handler;

import java.io.IOException;
import java.time.LocalDateTime;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.benzhu.xyz.bean.Book;
import com.benzhu.xyz.config.RabbitConfig;
import com.rabbitmq.client.Channel;

@Component
public class BookHandler {

    private static final Logger log = LoggerFactory.getLogger(BookHandler.class);

    @RabbitListener(queues = {RabbitConfig.REGISTER_QUEUE_NAME})
    public void listenerDelayQueue(Book book, Message message, Channel channel) {
        log.info("[listenerDelayQueue 监听的消息] – [消费时间] – [{}] – [{}]", LocalDateTime.now(), book.toString());
        try {
// TODO 通知 MQ 消息已被成功消费,可以ACK了
            System.out.println("MQ 消息已被成功消费");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
// TODO 如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
        }
    }
}

7.运行主函数

BenzhuApplication.java

package com.benzhu.xyz;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BenzhuApplication {
    //这个因为每个人创建的Spring Boot项目名不一样所以也会不一样,不用变动即可。
    public static void main(String[] args) {
        SpringApplication.run(BenzhuApplication.class, args);
    }

}

8.运行测试

打开浏览器输入:http://localhost:8080/books访问。运行时控制台输出如下内容即为正常。

9.缺点

由上面的方法可以看到此延迟队列的缺点就是当不同延迟时间的消息进入队列时,因为先进先出的原则,所以导致比较短延迟的消息被阻塞,一直等到长延迟的消息运行完毕才运行。所以我们运用到了插件的方法二。

方法二(使用插件)

1.下载rabbitmq_delayed_meaage_exchange

官网下载地址:https://www.rabbitmq.com/community-plugins.html

rabbitmq_delayed_message_exchange-20171201-3.7.x:本地下载

2.安装rabbitmq_delayed_meaage_exchange

  • 解压下载的安装包,把rabbitmq_delayed_message_exchange-20171201-3.7.x.ez文件放到安装目录的RabbitMQ Server\rabbitmq_server-3.7.15\plugins里面。

  • 进入RabbitMQ Server\rabbitmq_server-3.7.15\sbin目录在地址栏打上cmd,打开cmd。

  • 输入rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令安装插件。

  • 安装成功如下图,记得要重启。

3.查看插件

在cmd中输入rabbitmq-plugins list,看到标红的即为插件安装成功。

4.导入Maven

pom.xml

//加多如下依赖,当然web环境的依赖也要有
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

5.配置属性

application-dev.properties

//这里是dev环境的配置,不是主配置文件!!! 主配置文件引用一下这个就好。
server.port=8080
spring.rabbitmq.username=liuliu
spring.rabbitmq.password=123456
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=liuliu
spring.rabbitmq.connection-timeout=15000
#采用消息确认模式,消息发出去后,异步等待响应
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true

#消费端配置
#消费者监听并发数
spring.rabbitmq.listener.simple.concurrency=10
#最大并发数
spring.rabbitmq.listener.simple.max-concurrency=20
#签收模式 推荐使用manual手工签收
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#限流
spring.rabbitmq.listener.simple.prefetch=5

6.定义队列

RabbitConfig.java”

package com.benzhu.xyz.config;

import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.QueueBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

    //交换机名字
    public static final String DELAY_EXCHANGE_NAME = "demo5.exchange";
    //队列名字
    public static final String DELAY_QUEUE_NAME = "demo5.queue";
    //ROUTING_KRY
    public static final String ROUTING_KRY = "demo5";

    /**
     * 声明一个延迟队列
     * @return
     */
    @Bean
    Queue delayQueue(){
        return QueueBuilder.durable(DELAY_QUEUE_NAME).build();
    }
    /**
     * 声明一个交换机
     * @return
     */
    @Bean
    CustomExchange delayExchange(){

        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", true,false, args);

    }
    /**
     * 绑定
     * @param delayQueue
     * @param delayExchange
     * @return
     */
    @Bean
    Binding queueBinding(Queue delayQueue, CustomExchange delayExchange){
        return BindingBuilder.bind(delayQueue).to(delayExchange).with(ROUTING_KRY).noargs();
    }
}

7.如方法一的第四步定义实体类

8.写一个控制器

BookController.java

package com.benzhu.xyz.controller;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.core.RabbitTemplate.ConfirmCallback;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.benzhu.xyz.bean.Book;
import com.benzhu.xyz.config.RabbitConfig;

@RestController
@RequestMapping(value = "/books")
public class BookController {
    private final RabbitTemplate rabbitTemplate;

    /**
     * 构造方法
     * @param
     */
    @Autowired
    public BookController(CachingConnectionFactory connectionFactory) {
        this.rabbitTemplate = new RabbitTemplate(connectionFactory);
        System.out.println("BookController");
    }

    /**
     * 生产者回调函数:confirm确认消息投递成功
     */
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {

            String messageId = correlationData.getId();
            if (ack) {
                System.out.println("消息投递成功:"+messageId);
//进行消息记录的数据库更新

            }else{
                System.out.println("消息投递失败");
            }

        }
    };

    @GetMapping
    public void defaultMessage() {
        Book book = new Book();
        book.setId("1");
        book.setName("一起来学Spring Boot");
        this.rabbitTemplate.setConfirmCallback(confirmCallback);
        this.rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> System.out.println("消息发送失败回调!"));
        rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME,RabbitConfig.ROUTING_KRY, book,(message)->{
            message.getMessageProperties().setHeader("x-delay", 5000);//设置延迟时间
            return message;
        },new CorrelationData("123"));
        book.setName("学个鬼Spring Boot");
        rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME,RabbitConfig.ROUTING_KRY, book,(message)->{
            message.getMessageProperties().setHeader("x-delay", 1000);//设置延迟时间
            return message;
        },new CorrelationData("1234"));
        book.setName("Spring Boot");
        rabbitTemplate.convertAndSend(RabbitConfig.DELAY_EXCHANGE_NAME,RabbitConfig.ROUTING_KRY, book,(message)->{
            message.getMessageProperties().setHeader("x-delay", 10000);//设置延迟时间
            return message;
        },new CorrelationData("12345"));
    }
}

9.创建消费者

BookHandler.java

package com.benzhu.xyz.handler;

import java.io.IOException;
import java.time.LocalDateTime;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.benzhu.xyz.bean.Book;
import com.benzhu.xyz.config.RabbitConfig;
import com.rabbitmq.client.Channel;

@Component
public class BookHandler {

    @RabbitListener(queues = RabbitConfig.DELAY_QUEUE_NAME)
    @RabbitHandler
    public void receive( Book book,Message message,Channel channel) {

        System.out.println("接收到的消息:"+book.toString() +"||"+LocalDateTime.now());
        try {
// TODO 通知 MQ 消息已被成功消费,可以ACK了
            System.out.println("MQ 消息已被成功消费");
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
// TODO 如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
        }
    }
}

10.运行主函数

BenzhuApplication.java

package com.benzhu.xyz;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BenzhuApplication {
    //这个因为每个人创建的Spring Boot项目名不一样所以也会不一样,不用变动即可。
    public static void main(String[] args) {
        SpringApplication.run(BenzhuApplication.class, args);
    }
}

11.运行测试

打开浏览器输入:http://localhost:8080/books访问。运行时控制台输出如下内容。

12.备注

可以放到当我们放不同延迟的消息到交换机的时候,消息没有发生阻塞,每条消息都按照各自的延迟正常的输出。当然插件据说所定义的时间不能超过39天,不然会报错。可以放到当我们放不同延迟的消息到交换机的时候,消息没有发生阻塞,每条消息都按照各自的延迟正常的输出。当然插件据说所定义的时间不能超过39天,不然会报错。

参考地址

https://www.kancloud.cn/liquanqiang/spring-amqp-2/736932

https://blog.battcn.com/2018/05/23/springboot/v2-queue-rabbitmq-delay/

https://juejin.im/post/5a12ffd451882578da0d7b3a

https://blog.csdn.net/WayneLee0809/article/details/84643919