介绍

MQ全称(Message Queue)又名消息队列,是一种异步通讯的中间件。可以将它理解成邮局,发送者将消息传递到邮局,然后由邮局帮我们发送给具体的消息接收者(消费者),具体发送过程与时间我们无需关心,它也不会干扰我进行其它事情。常见的MQ有kafka、activemq、zeromq、rabbitmq 等等,各大MQ的对比和优劣势可以自行Google。

基本概念

  • Broker:简单来说就是消息队列服务器实体

  • Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列

  • Queue:消息队列载体,每个消息都会被投入到一个或多个队列

  • Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来

  • Routing Key:路由关键字,exchange根据这个关键字进行消息投递

  • vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离

  • producer:消息生产者,就是投递消息的程序

  • consumer:消息消费者,就是接受消息的程序

  • channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务

案例

1.导入依赖

pom.xml

//加多如下依赖,当然web环境的依赖也要有
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
  <groupId>com.alibaba</groupId>
  <artifactId>fastjson</artifactId>
  <version>1.2.46</version>
</dependency>

2.配置属性

application-dev.properties

//这里是dev环境的配置,不是主配置文件!!! 主配置文件引用一下这个就好。
server.port=8080

spring.rabbitmq.username=liuliu
spring.rabbitmq.password=123456
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=liuliu
# 手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual

3.定义队列

RabbitConfig.java

package com.benzhu.xyz.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    public static final String DEFAULT_BOOK_QUEUE = "demo1";
    public static final String MANUAL_BOOK_QUEUE = "demo2";

    @Bean
    public Queue defaultBookQueue() {
// 第一个是 QUEUE 的名字,第二个是消息是否需要持久化处理
        return new Queue(DEFAULT_BOOK_QUEUE, true);
    }

    @Bean
    public Queue manualBookQueue() {
// 第一个是 QUEUE 的名字,第二个是消息是否需要持久化处理
        return new Queue(MANUAL_BOOK_QUEUE, true);
    }
}

4.定义一个实体类

Book.java

package com.benzhu.xyz.bean;

public class Book implements java.io.Serializable{
    //注意要序列化
    private static final long serialVersionUID = -4752635028290410893L;
    private String id;
    private String name;
    public String getId() {
        return id;
    }
    public void setId(String id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public Book(String id, String name) {
        super();
        this.id = id;
        this.name = name;
    }
    public Book() {
        super();
    }
    @Override
    public String toString() {
        return "Book [id=" + id + ", name=" + name + "]";
    }

}

5.写一个控制器

BookController.java

package com.benzhu.xyz.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import com.benzhu.xyz.bean.Book;
import com.benzhu.xyz.config.RabbitConfig;

@RestController
@RequestMapping(value = "/books")
public class BookController {
    private final RabbitTemplate rabbitTemplate;

    @Autowired
    public BookController(RabbitTemplate rabbitTemplate) {
        this.rabbitTemplate = rabbitTemplate;
    }

    /**
     * this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book); 对应 {@link BookHandler#listenerAutoAck}
     * this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book); 对应 {@link BookHandler#listenerManualAck}
     */
    @GetMapping
    public void defaultMessage() {
        Book book = new Book();
        book.setId("1");
        book.setName("一起来学Spring Boot");
        System.out.println(book);
        this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book);
        this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book);
    }
}

6.消息消费者

BookHandler.java

package com.benzhu.xyz.handler;

import java.io.IOException;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.benzhu.xyz.bean.Book;
import com.benzhu.xyz.config.RabbitConfig;
import com.rabbitmq.client.Channel;

@Component
public class BookHandler {

    @RabbitListener(queues = {RabbitConfig.DEFAULT_BOOK_QUEUE})
    public void listenerAutoAck(Book book, Message message, Channel channel) {
// TODO 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
        final long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("demo1"+book);
// TODO 通知 MQ 消息已被成功消费,可以ACK了
            channel.basicAck(deliveryTag, false);
        } catch (IOException e) {
            try {
// TODO 处理失败,重新压入MQ
                channel.basicRecover();
            } catch (IOException e1) {
                e1.printStackTrace();
            }
        }
    }

    @RabbitListener(queues = {RabbitConfig.MANUAL_BOOK_QUEUE})
    public void listenerManualAck(Book book, Message message, Channel channel) {
        System.out.println("demo2"+book);
        try {
// TODO 通知 MQ 消息已被成功消费,可以ACK了
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
// TODO 如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
        }
    }
}

7.运行主函数

BenzhuApplication.java

package com.benzhu.xyz;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class BenzhuApplication {
    //这个因为每个人创建的Spring Boot项目名不一样所以也会不一样,不用变动即可。
    public static void main(String[] args) {
        SpringApplication.run(BenzhuApplication.class, args);
    }
}

8.运行测试

打开浏览器输入:http://localhost:8080/books访问。运行时控制台输入如下内容即为正常。