Spring-Boot-Message-Queue

Spring Boot 与消息队列(以RabbitMQ为例)


在Docker中安装

1
docker pull rabbitmq-3-rc-management

新建容器运行

1
docker run -p 5673:5672 -p 15673:15672 --name ogic-rabbitmq rabbitmq-3-rc-management

进入RabbitMQ管理网页

在浏览器中打开localhost:15673进入管理页面

默认帐号密码都是guest,登录成功后会显示管理页面

Exchanges

交换器用于给绑定的队列发送消息,交换器有四种类型

  1. direct

    匹配Routing key,只给完全一致的队列发送消息

  2. fanout

    给绑定的所有队列发送消息

  3. headers

    使用header匹配一个或多个Routing key,给匹配的队列发送消息

  4. topic

    匹配Routing key并接受通配符,给合适的队列发送消息

Queues

队列有ClassicQuorum类型,Classic类型可设置是否可持久化。

在Spring Boot中使用

pom.xml中引入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

引入依赖后会默认使用RabbitAutoConfiguration来进行RabbitMQ的自动配置,它会自动配置ConnectionFactory来连接RabbitMQ,默认使用地址和端口是localhost:5672,帐号和密码都用的guset,如果要修改配置到application.propertiesapplication.yml中修改,如:

1
2
3
4
5
6
spring:
rabbitmq:
host: [你的IP地址]
port: [你的端口号]
username: [你的帐号]
password: [密码]

自动配置会将这些RabbitMQ的配置封装到RabbitProperties中,然后建一个RabbitTemplate到容器中来给用户使用来发送和接收信息、一个AmqpAdmin组件来管理RabbitMQ系统功能(包括创建交换器和队列)

rabbitTemplate

使用自动注入来获得该组件

1
2
@Autowired
RabbitTemplate rabbitTemplate;

send

1
rabbitTemplate.send("exchange","routingKey",message);

这里exchangeroutingKey用的都是字符串,具体的名字由你创建时定义,在调用方法时也有不用输入这两个参数的方法send(Message var1),但是要自己先调用setExchangesetRoutingKey方法进行设置,默认是""

message用的是Message类型,使用时要自己创建:

1
Message message = new Message(body,messageProperties);

Message的构造方法要传入一个byte[]MessageProperties,即要自己进行序列化和创建messageProperties封装成Message再发送:

1
2
byte[] body = "hello".getBytes();
MessageProperties messageProperties = new MessageProperties();

convertAndSend

1
rabbitTemplate.convertAndSend("exchange","routingKey",object);

convertAndSend方法方便很多,你同样要指定exchangeroutingKey,也同样可以通过设置默认交换器和路径省略。但是这里传入的信息类型变成了一个object,即我们可以传入任意类型,默认使用java.io.Serializable来进行序列化和反序列化。

如果要改变序列化器,例如使用Json来序列化,可以在配置类中往容器中注入自己的序列化器:

1
2
3
4
@Bean
MessageConverter setDeafultMessageConverter(){
return new Jackson2JsonMessageConverter();
}

receive

1
rabbitTemplate.receive("queueName",timeoutMillis);

输入参数是队列名和时间限制,返回类型是Message

receiveAndConvert

1
rabbitTemplate.receiveAndConvert("queueName",timeoutMillis);

输入参数也是队列名和时间限制,返回类型是Object

监听事件

使用@EnableRabbit来开启这一功能(基于注解的RabbitMQ模式)

在方法中使用@RabbitListener来监听队列,例如:

1
2
3
4
@RabbitListener(queues = {"ogic","ogic.news"})
public void receive(Book book){
logger.info(book.toString());
}

AmqpAdmin

使用AmqpAdmin来创建队列、交换器和进行绑定

自动注入:

1
2
@Autowired
AmqpAdmin amqpAdmin;
1
2
3
4
public void addExchange(){
Exchange exchange = ExchangeBuilder.directExchange("book.direct").build();
amqpAdmin.declareExchange(exchange);
}

创建队列和绑定的方式类似。