介绍
MQ全称(Message Queue)又名消息队列,是一种异步通讯的中间件。可以将它理解成邮局,发送者将消息传递到邮局,然后由邮局帮我们发送给具体的消息接收者(消费者),具体发送过程与时间我们无需关心,它也不会干扰我进行其它事情。常见的MQ有kafka、activemq、zeromq、rabbitmq 等等,各大MQ的对比和优劣势可以自行Google。
基本概念
Broker:简单来说就是消息队列服务器实体
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
Queue:消息队列载体,每个消息都会被投入到一个或多个队列
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来
Routing Key:路由关键字,exchange根据这个关键字进行消息投递
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离
producer:消息生产者,就是投递消息的程序
consumer:消息消费者,就是接受消息的程序
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务
案例
1.导入依赖
pom.xml
//加多如下依赖,当然web环境的依赖也要有
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.46</version>
</dependency>
2.配置属性
application-dev.properties
//这里是dev环境的配置,不是主配置文件!!! 主配置文件引用一下这个就好。
server.port=8080
spring.rabbitmq.username=liuliu
spring.rabbitmq.password=123456
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=liuliu
# 手动ACK 不开启自动ACK模式,目的是防止报错后未正确处理消息丢失 默认 为 none
spring.rabbitmq.listener.simple.acknowledge-mode=manual
3.定义队列
RabbitConfig.java
package com.benzhu.xyz.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
public static final String DEFAULT_BOOK_QUEUE = "demo1";
public static final String MANUAL_BOOK_QUEUE = "demo2";
@Bean
public Queue defaultBookQueue() {
// 第一个是 QUEUE 的名字,第二个是消息是否需要持久化处理
return new Queue(DEFAULT_BOOK_QUEUE, true);
}
@Bean
public Queue manualBookQueue() {
// 第一个是 QUEUE 的名字,第二个是消息是否需要持久化处理
return new Queue(MANUAL_BOOK_QUEUE, true);
}
}
4.定义一个实体类
Book.java
package com.benzhu.xyz.bean;
public class Book implements java.io.Serializable{
//注意要序列化
private static final long serialVersionUID = -4752635028290410893L;
private String id;
private String name;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Book(String id, String name) {
super();
this.id = id;
this.name = name;
}
public Book() {
super();
}
@Override
public String toString() {
return "Book [id=" + id + ", name=" + name + "]";
}
}
5.写一个控制器
BookController.java
package com.benzhu.xyz.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.benzhu.xyz.bean.Book;
import com.benzhu.xyz.config.RabbitConfig;
@RestController
@RequestMapping(value = "/books")
public class BookController {
private final RabbitTemplate rabbitTemplate;
@Autowired
public BookController(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
/**
* this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book); 对应 {@link BookHandler#listenerAutoAck}
* this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book); 对应 {@link BookHandler#listenerManualAck}
*/
@GetMapping
public void defaultMessage() {
Book book = new Book();
book.setId("1");
book.setName("一起来学Spring Boot");
System.out.println(book);
this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE, book);
this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOK_QUEUE, book);
}
}
6.消息消费者
BookHandler.java
package com.benzhu.xyz.handler;
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import com.benzhu.xyz.bean.Book;
import com.benzhu.xyz.config.RabbitConfig;
import com.rabbitmq.client.Channel;
@Component
public class BookHandler {
@RabbitListener(queues = {RabbitConfig.DEFAULT_BOOK_QUEUE})
public void listenerAutoAck(Book book, Message message, Channel channel) {
// TODO 如果手动ACK,消息会被监听消费,但是消息在队列中依旧存在,如果 未配置 acknowledge-mode 默认是会在消费完毕后自动ACK掉
final long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("demo1"+book);
// TODO 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
try {
// TODO 处理失败,重新压入MQ
channel.basicRecover();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
@RabbitListener(queues = {RabbitConfig.MANUAL_BOOK_QUEUE})
public void listenerManualAck(Book book, Message message, Channel channel) {
System.out.println("demo2"+book);
try {
// TODO 通知 MQ 消息已被成功消费,可以ACK了
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
// TODO 如果报错了,那么我们可以进行容错处理,比如转移当前消息进入其它队列
}
}
}
7.运行主函数
BenzhuApplication.java
package com.benzhu.xyz;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class BenzhuApplication {
//这个因为每个人创建的Spring Boot项目名不一样所以也会不一样,不用变动即可。
public static void main(String[] args) {
SpringApplication.run(BenzhuApplication.class, args);
}
}
8.运行测试
打开浏览器输入:http://localhost:8080/books访问。运行时控制台输入如下内容即为正常。
评论