消息队列RabbitMQ
- 生产者发消息只需要知道发给哪个交换机,设置对应的routingKey以及发送消息内容,同时还可以设置一些参数如消息的过期时间,是否持久化等
- 消费者只需要知道从哪个队列中接受消息,提供消费消息的方法和是否自动应答
- 队列的创建和交换机的创建可以由第三方创建,不需要在生产者和消费者的代码中涉及

简单模式以及工作队列模式
- 简单模式:生产者->消息队列<-消费者
- 工作队列模式:生产者->消息队列<-[消费者1,消费者2…]
class RabbitMQUtils{
public static Channel getChannel(){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.86.72");
connectionFactory.setUsername("rabbitmq");
connectionFactory.setPassword("rabbitmq");
connectionFacotry.setPort(5672);
Channel channel = connectionFactory.createChannel();
return channel;
}
}
public class producer{
public static void main(String[] args) throws Execption{
Channel channel = RabbitMQUtils.getChannel();
channel.confirmSelect();
channel.addConfirmListener((item1,item2)->{
System.out.println(item1);
},(item1,item2)->{
System.out.println(item1);
})
channel.queueDeclare("helloworld",true,false,false,null);
int batchsize =0;
for (int i=0; i<100; i++){
batchsize++;
channel.basicPublish("","helloworld",MessageProperties.PRESISTENT_TEXT_PLAIN,("message"+i).getBytes());
if (batchsize>=50){
if(channel.waitForConfirms()){
System.out.println("发布成功");
batchsize=0;
}
}
}
}
}
public class Consumer{
public static void main(String[] args){
Channel channel = RabbitMQUtils.getChannel();
channel.basicQos(1);
channel.basicComsume("helloworld",false,(item1,item2)->{
System.out.println("消息接受成功:" + new String(item2.getBody()));
Thread.sleep(1000);
channel.basicAck(item2.getEnvelope().getDeliverTag(),false);
},item->{
System.out.println(item+"消费中断");
});
}
}
发布订阅模式
- 广播模式(fanout)
- 直接模式(direct)
- 主题模式(topic)
广播模式(fanout)
- 只需要一个交换机,然后生产者发送消息给交换机,不需要指定routingKey,消费者自己创建随机的队列并绑定到交换机上也不需要指定routingKey,然后就可以接受到消息。
class RabbitMQUtils{
public static Channel getChannel(){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.86.72");
connectionFactory.setUsername("rabbitmq");
connectionFactory.setPassword("rabbitmq");
connectionFacotry.setPort(5672);
Channel channel = connectionFactory.createChannel();
return channel;
}
}
public class producer{
public static void main(String[] args) throws Execption{
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare("exchange_name","fanout");
for (int i=0; i<100; i++){
channel.basicPublish("exchange_name","",null,("message"+i).getBytes());
}
}
}
public class Consumer{
public static void main(String[] args){
Channel channel = RabbitMQUtils.getChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,"exchange_name","");
channel.basicComsume(queueName,true,(item1,item2)->{
System.out.println("消息接受成功:" + new String(item2.getBody()));
},item->{
System.out.println(item+"消费中断");
});
}
}
直接模式(direct)
- 相比与广播模式,直接模式需要指定routingKey,生产者发送消息给交换机的时候需要指定routingKey,然后交换机根据routingKey发送给对应的消费者的队列。消费者还是可以创建随机的队列,然后将队列绑定到交换机上同时在绑定的时候指定对应的routingKey
class RabbitMQUtils{
public static Channel getChannel(){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.86.72");
connectionFactory.setUsername("rabbitmq");
connectionFactory.setPassword("rabbitmq");
connectionFacotry.setPort(5672);
Channel channel = connectionFactory.createChannel();
return channel;
}
}
public class producer{
public static void main(String[] args) throws Execption{
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare("exchange_name","direct");
for (int i=0; i<100; i++){
channel.basicPublish("exchange_name","white",null,("white"+i).getBytes());
channel.basicPublish("exchange_name","black",null,("black"+i).getBytes());
channel.basicPublish("exchange_name","grey",null,("grey"+i).getBytes());
}
}
}
public class Consumer1{
public static void main(String[] args){
Channel channel = RabbitMQUtils.getChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,"exchange_name","black");
channel.queueBind(queueName,"exchange_name","grey");
channel.basicComsume(queueName,true,(item1,item2)->{
System.out.println("消息接受成功:" + new String(item2.getBody()));
},item->{
System.out.println(item+"消费中断");
});
}
}
public class Consumer2{
public static void main(String[] args){
Channel channel = RabbitMQUtils.getChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,"exchange_name","white");
channel.basicComsume(queueName,true,(item1,item2)->{
System.out.println("消息接受成功:" + new String(item2.getBody()));
},item->{
System.out.println(item+"消费中断");
});
}
}
主题模式(topic)
- 主题模式相比于直接模式,直接模式的routingKey是写死的,只有完全匹配才会接收消息,而主题模式的routingKey可以适用通配符,只要和通配符相匹配就可以接受到消息
- 消息主题的通配符有两种,*代表一个单词,#代表0个单词或多个单词,主题模式的routingKey可以由多个单词组成,但是每个单词必须要通过
.
号分隔开
- 主题模式下的队列的routingKey,如果没有
*
或者#
则变为直接模式,如果只有一个#
则变为广播模式
class RabbitMQUtils{
public static Channel getChannel(){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.86.72");
connectionFactory.setUsername("rabbitmq");
connectionFactory.setPassword("rabbitmq");
connectionFacotry.setPort(5672);
Channel channel = connectionFactory.createChannel();
return channel;
}
}
public class producer{
public static void main(String[] args) throws Execption{
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare("exchange_name","topic");
for (int i=0; i<100; i++){
channel.basicPublish("exchange_name","white.white1",null,("white1"+i).getBytes());
channel.basicPublish("exchange_name","white.white2",null,("white2"+i).getBytes());
channel.basicPublish("exchange_name","white.white3",null,("white3"+i).getBytes());
}
}
}
public class Consumer1{
public static void main(String[] args){
Channel channel = RabbitMQUtils.getChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,"exchange_name","white.#");
channel.basicComsume(queueName,true,(item1,item2)->{
System.out.println("消息接受成功:" + new String(item2.getBody()));
},item->{
System.out.println(item+"消费中断");
});
}
}
死信
- 死信队列绑定到正常的队列中,依赖于正常的队列,当正常队列出现消息过期,消息拒绝,队列已满,则消息会转发到死信交换机上
- 死信交换机可以绑定到多个正常的队列中,根据每个正常队列绑定死信交换机的不同路由键,将正常队列的死亡的消息转发到死信交换机上对应路由键的队列。

class RabbitMQUtils{
public static Channel getChannel(){
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.86.72");
connectionFactory.setUsername("rabbitmq");
connectionFactory.setPassword("rabbitmq");
connectionFacotry.setPort(5672);
Channel channel = connectionFactory.createChannel();
return channel;
}
}
public class producer{
public static void inithandler(){
Channel channel = RabbitMQUtils.getChannel();
channel.exchangeDeclare("nromal_exchange","direct");
channel.exchangeDeclare("dead_exchange","direct");
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange","dead_exchange");
map.put("x-dead-letter-routing-key","siwang");
map.put("x-max-length","6");
map.put("x-message-ttl","10000");
channel.queueDeclare("normal_queue",false,false,false,map);
channel.queueDeclare("dead_queue",false,false,false,null);
channel.queueBind("normal_queue","nromal_exchange","zhengchang");
channel.queueBind("dead_queue","dead_exchange","siwang");
}
public static void main(String[] args) throws Execption{
inithandler();
Channel channel = RabbitMQUtils.getChannel();
for (int i=0; i<10; i++){
channel.basicPublish("normal_queue","zhengchang",
new AMQP.BasicProperties().builder().expiration("10000").build(),
("test"+i).getBytes());
}
}
}
public class Consumer1{
public static void main(String[] args){
Channel channel = RabbitMQUtils.getChannel();
channel.basicComsume("normal_queue",false,(item1,item2)->{
if (new String(item2.getBody()).contains("test5")){
channel.basicReject(item2.getEnvelope().getDeliverTag(),false);
System.out.println("拒绝消息:" + new String(item2.getBody()));
}else{
System.out.println("正常消息接收成功:" + new String(item2.getBody()));
}
},item->{
System.out.println(item+"消费中断");
});
Channel channel1 = RabbitMQUtils.getChannel();
channel1.basicComsume("dead_queue",true,(item1,item2)->{
System.out.println("死信消息接受成功:" + new String(item2.getBody()));
},item->{
System.out.println(item+"消费中断");
});
}
}
延迟队列
- 所谓延迟队列就是在创建队列时设置队列的消息过期时间,过期之后发送到绑定的死信队列中,过期时间可以在队列创建的时候指定,也可以在发送消息的时候指定
- 整合SpringBoot:引入POM依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
spring:
rabbitmq:
host: 192.168.86.72
username: rabbitmq
password: rabbitmq
port: 5672
@RestController
public class PublishRest {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping(path = "publish")
public boolean publish(String exchange, String routing, String data) {
rabbitTemplate.convertAndSend(exchange, routing, data);
return true;
}
}
@RestController
public class PublishRest {
@GetMapping(path = "publish")
public boolean publish(String exchange, String routing, String data) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);
System.out.println("ConfirmCallback: "+"确认情况:"+ack);
System.out.println("ConfirmCallback: "+"原因:"+cause);
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback: "+"消息:"+message);
System.out.println("ReturnCallback: "+"回应码:"+replyCode);
System.out.println("ReturnCallback: "+"回应信息:"+replyText);
System.out.println("ReturnCallback: "+"交换机:"+exchange);
System.out.println("ReturnCallback: "+"路由键:"+routingKey);
}
});
rabbitTemplate.convertAndSend(exchange, routing, data);
return true;
}
}
@Configuration
public class RabbitConfig {
private final String EXCHANGE_NAME = "boot_topic_exchange";
private final String QUEUE_NAME = "boot_queue";
@Autowire
RabbitTemplate rabbitTemplate;
@Bean
public void createRabbitTemplate(ConnectionFactory connectionFactory){
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
System.out.println("ConfirmCallback: "+"相关数据:"+correlationData);
System.out.println("ConfirmCallback: "+"确认情况:"+ack);
System.out.println("ConfirmCallback: "+"原因:"+cause);
}
});
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("ReturnCallback: "+"消息:"+message);
System.out.println("ReturnCallback: "+"回应码:"+replyCode);
System.out.println("ReturnCallback: "+"回应信息:"+replyText);
System.out.println("ReturnCallback: "+"交换机:"+exchange);
System.out.println("ReturnCallback: "+"路由键:"+routingKey);
}
});
}
@Bean("bootExchange")
public Exchange getExchange()
{
return ExchangeBuilder
.topicExchange(EXCHANGE_NAME)
.durable(true)
.build();
}
@Bean("bootQueue")
public Queue getMessageQueue()
{
Map<String,Object> map = new HashMap<>();
map.put("x-dead-letter-exchange",EXCHANGE_NAME);
map.put("x-dead-routing-key","routingkey");
map.put("x-message-ttl",10000);
return new QueueBuilder.durable(QUEUE_NAME).withArguments(map).build();
}
@Bean
public Binding bindMessageQueue(@Qualifier("bootExchange") Exchange exchange, @Qualifier("bootQueue") Queue queue)
{
return BindingBuilder
.bind(queue)
.to(exchange)
.with("routingKey.*")
.noargs();
}
}
@Component
public class Consumer {
@RabbitListener(
bindings = @QueueBinding(value = @Queue(value = "topic.n3", durable = "false", autoDelete = "true"),
exchange = @Exchange(value = "topic.e", type = ExchangeTypes.TOPIC), key = "r"),
ackMode = "MANUAL")
public void consumerDoAck(String data, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("consumerDoAck: " + data);
if (data.contains("success")) {
channel.basicAck(deliveryTag, false);
} else {
channel.basicNack(deliveryTag, false, true);
}
}
}
备份交换机
- 当生产者发送消息到原始交换机上时发现此条消息指定的队列不存在,无法转发消息,这时此条消息就会转发到备份交换机
- 备份交换机绑定在原始交换机上,通过参数的形式进行绑定,与死信交换机不同,死信交换机是绑定在队列上。
- 如果备份交换机与消息回退回调函数同时存在,则以备份交换机为准