Spring Boot 与消息队列(以RabbitMQ为例)
在Docker中安装
1 | docker pull rabbitmq-3-rc-management |
新建容器运行
1 | docker run -p 5673:5672 -p 15673:15672 --name ogic-rabbitmq rabbitmq-3-rc-management |
进入RabbitMQ管理网页
在浏览器中打开localhost:15673
进入管理页面
默认帐号密码都是guest
,登录成功后会显示管理页面
Exchanges
交换器用于给绑定的队列发送消息,交换器有四种类型
direct
匹配
Routing key
,只给完全一致的队列发送消息fanout
给绑定的所有队列发送消息
headers
使用
header
匹配一个或多个Routing key
,给匹配的队列发送消息topic
匹配
Routing key
并接受通配符,给合适的队列发送消息
Queues
队列有Classic
和Quorum
类型,Classic
类型可设置是否可持久化。
在Spring Boot中使用
在pom.xml
中引入依赖
1 | <dependency> |
引入依赖后会默认使用RabbitAutoConfiguration
来进行RabbitMQ的自动配置,它会自动配置ConnectionFactory
来连接RabbitMQ,默认使用地址和端口是localhost:5672
,帐号和密码都用的guset
,如果要修改配置到application.properties
或application.yml
中修改,如:
1 | spring: |
自动配置会将这些RabbitMQ的配置封装到RabbitProperties
中,然后建一个RabbitTemplate
到容器中来给用户使用来发送和接收信息、一个AmqpAdmin
组件来管理RabbitMQ系统功能(包括创建交换器和队列)
rabbitTemplate
使用自动注入来获得该组件
1 |
|
send
1 | rabbitTemplate.send("exchange","routingKey",message); |
这里exchange
和routingKey
用的都是字符串,具体的名字由你创建时定义,在调用方法时也有不用输入这两个参数的方法send(Message var1)
,但是要自己先调用setExchange
和setRoutingKey
方法进行设置,默认是""
。
message
用的是Message
类型,使用时要自己创建:
1 | Message message = new Message(body,messageProperties); |
Message
的构造方法要传入一个byte[]
和MessageProperties
,即要自己进行序列化和创建messageProperties
封装成Message
再发送:
1 | byte[] body = "hello".getBytes(); |
convertAndSend
1 | rabbitTemplate.convertAndSend("exchange","routingKey",object); |
convertAndSend
方法方便很多,你同样要指定exchange
和routingKey
,也同样可以通过设置默认交换器和路径省略。但是这里传入的信息类型变成了一个object
,即我们可以传入任意类型,默认使用java.io.Serializable
来进行序列化和反序列化。
如果要改变序列化器,例如使用Json来序列化,可以在配置类中往容器中注入自己的序列化器:
1 |
|
receive
1 | rabbitTemplate.receive("queueName",timeoutMillis); |
输入参数是队列名和时间限制,返回类型是Message
receiveAndConvert
1 | rabbitTemplate.receiveAndConvert("queueName",timeoutMillis); |
输入参数也是队列名和时间限制,返回类型是Object
监听事件
使用@EnableRabbit
来开启这一功能(基于注解的RabbitMQ模式)
在方法中使用@RabbitListener
来监听队列,例如:
1 | "ogic","ogic.news"}) (queues = { |
AmqpAdmin
使用AmqpAdmin
来创建队列、交换器和进行绑定
自动注入:
1 |
|
1 | public void addExchange(){ |
创建队列和绑定的方式类似。