详解RabbitMQ高级特性之发送方确认机制

article/2025/6/18 9:12:45

目录

发送方确认

添加配置

常量类

声明队列和交换机并绑定二者关系

confirm确认模式 

编写生产消息代码

生产消息1

解决方法

多次生产消息2

解决方法

生产消息3

return 模式

编写生产消息代码(路由正确)

生产消息1

编写生产消息代码(路由错误)

生产消息2

面试题


发送方确认

在使⽤ RabbitMQ的时候, 可以通过消息持久化来解决因为服务器的异常崩溃⽽导致的消息丢失, 但是还有⼀个问题, 当消息的⽣产者将消息发送出去之后, 消息到底有没有正确地到达服务器呢? 如果在消息到达服务器之前已经丢失(⽐如RabbitMQ重启, 那么RabbitMQ重启期间⽣产者消息投递失败), 持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?

RabbitMQ为我们提供了两种解决⽅案:

a. 通过事务机制实现
b. 通过发送⽅确认(publisher confirm) 机制实现

事务机制⽐较消耗性能, 在实际⼯作中使⽤也不多, 下面主要介绍confirm机制来实现发送⽅的确认.

RabbitMQ为我们提供了两个⽅式来控制消息的可靠性投递:

1. confirm确认模式
2. return退回模式

添加配置
spring:application:name: rabbit-extensions-demorabbitmq:addresses: amqp://study:study@47.98.109.138:5672/extensionpublisher-confirm-type: correlated   #消息发送确认
常量类
public class Constants {//发送方确认public static final String CONFIRM_QUEUE = "confirm.queue";public static final String CONFIRM_EXCHANGE = "confirm.exchange";
}
声明队列和交换机并绑定二者关系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import rabbitextensionsdemo.constant.Constants;@Configuration
public class RabbitMQConfig {//发送方确认@Bean("confirmQueue")public Queue confirmQueue(){return QueueBuilder.durable(Constants.CONFIRM_QUEUE).build();}@Bean("confirmExchange")public DirectExchange confirmExchange(){return ExchangeBuilder.directExchange(Constants.CONFIRM_EXCHANGE).build();}@Bean("confirmBinding")public Binding confirmBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("confirm").noargs();}
}
confirm确认模式 

Producer 在发送消息的时候, 对发送端设置⼀个ConfirmCallback的监听, ⽆论消息是否到达
Exchange, 这个监听都会被执⾏, 如果Exchange成功收到, ACK( Acknowledge character , 确认字符)为true, 如果没收到消息, ACK就为false。

RabbitTemplate.ConfirmCallback 和 ConfirmListener 区别

在RabbitMQ中, ConfirmListener和ConfirmCallback都是⽤来处理消息确认的机制, 但它们属于不同的客⼾端库, 并且使⽤的场景和⽅式有所不同.
1. ConfirmListener 是 RabbitMQ Java Client 库中的接⼝. 这个库是 RabbitMQ 官⽅提供的⼀个直接与RabbitMQ服务器交互的客⼾端库. ConfirmListener 接⼝提供了两个⽅法: handleAck 和handleNack, ⽤于处理消息确认和否定确认的事件.
2. ConfirmCallback 是 Spring AMQP 框架中的⼀个接⼝. 专⻔为Spring环境设计. ⽤于简化与
RabbitMQ交互的过程. 它只包含⼀个 confirm ⽅法,⽤于处理消息确认的回调.
在 Spring Boot 应⽤中, 通常会使⽤ ConfirmCallback, 因为它与 Spring 框架的其他部分更加整合, 可以利⽤ Spring 的配置和依赖注⼊功能. ⽽在使⽤ RabbitMQ Java Client 库时, 则可能会直接实现ConfirmListener 接⼝, 更直接的与RabbitMQ的Channel交互

编写生产消息代码
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;import java.util.Date;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate confirmRabbitTemplate;@RequestMapping("/confirm")public String confirm() {confirmRabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "confirm test...", correlationData);return "消息发送成功";}
}
public interface ConfirmCallback {
        /**
        * 确认回调
        * @param correlationData: 发送消息时的附加信息 , 通常⽤于在确认回调中识别特定的消
        * @param ack: 交换机是否收到消息 , 收到为 true, 未收到为 false
        * @param cause: 当消息确认失败时 , 这个字符串参数将提供失败的原因 . 这个原因可以⽤于调 试和错误处理 .
        * 成功时 , cause null
        */
        void confirm ( @Nullable CorrelationData correlationData, boolean ack,
        @Nullable String cause);
}

生产消息1

第一次生产消息

第二次生产消息

此时我们看到,第一次生产消息时能够正常生产消息,但是当我们第二次生产消息时却抛异常了,异常信息为:java.lang.IllegalStateException: Only one ConfirmCallback is supported by each RabbitTemplate

解决方法

是为什么呢?从异常信息中我们可以看到,ConfirmCallback只能被设置一次,但是从我们的代码中可以看到,我们每次生产消息时都会设置一次ConfirmCallback,显然这就是问题所在。

下面我们把刚刚的ConfirmCallback提取出来,重新设置RabbitTemplate。

RabbitTemplateConfig

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});return rabbitTemplate;}
}

ProducerController

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import rabbitextensionsdemo.constant.Constants;@RequestMapping("/producer")
@RestController
public class ProducerController {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate RabbitTemplate confirmRabbitTemplate;@RequestMapping("/pres")public String pres() {Message message = new Message("Presistent test...".getBytes(), new MessageProperties());//消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);System.out.println(message);rabbitTemplate.convertAndSend(Constants.PRES_EXCHANGE, "pres", message);return "消息发送成功";}@RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "confirm test...", correlationData);return "消息发送成功";}
}

生产消息

多次生产消息2

此时我们可以看到,我们解决了前面多次生产消息导致的ConfirmCallback被设置多次的问题,但是我们此时的代码就真的没有问题了吗?

当我们生产其它消息时,发现我们并没有给这个生产消息的方法设置ConfirmCallback啊,但是为什么在控制台上看到执行了我们设置的ConfrimCallback,这是为什么呢?

是因为我们在前面设置了RabbitTemplate,而且使用了@Autowired注解注入了RabbitTemplate,虽然我们注入了两个,一个是rabbitTemplate,一个是confirmRabbitTemplate,但是这两个都是同一个RabbitTemplate。

解决方法

解决办法:我们在RabbitTemplateConfig中设置两个RabbitTemplate.

import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});return rabbitTemplate;}
}

与此同时,我们修改注入方式:

此时,当再次使用/producer/pres来生产消息时,就没问题了。

生产消息3

下面我们修改一下生产消息时给消息设置的路由规则:

    @RequestMapping("/confirm")public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);return "消息发送成功";}

生产消息

我们知道,上面生产消息时给消息设置的路由规则并不存在,按道理说,应该会打印“未收到消息”而非“收到消息”,原因是因为,上面的confirm确认模式是用来确定生产消息是否到达了交换机,而上面的路由规则是针对消息从交换机到队列的,解决上面的路由问题使用到另一种确认模式。

return 模式

消息到达Exchange之后, 会根据路由规则匹配, 把消息放⼊Queue中. Exchange到Queue的过程, 如果⼀条消息⽆法被任何队列消费(即没有队列与消息的路由键匹配或队列不存在等), 可以选择把消息退回给发送者. 消息退回给发送者时, 我们可以设置⼀个返回回调⽅法, 对消息进⾏处理。

修改RabbitTemplateConfig,设置消息退回的回调方法

import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitTemplateConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);return rabbitTemplate;}@Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});//消息被退回时, 回调方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回:"+returned);}});return rabbitTemplate;}
}

使⽤RabbitTemplate的setMandatory⽅法设置消息的mandatory属性为true(默认为false). 这个属性
的作⽤是告诉RabbitMQ, 如果⼀条消息⽆法被任何队列消费, RabbitMQ应该将消息返回给发送者, 此时 ReturnCallback 就会被触发。

回调函数中有⼀个参数: ReturnedMessage, 包含以下属性:

public class ReturnedMessage {
        //返回的消息对象,包含了消息体和消息属性
        private final Message message;
        //由 Broker 提供的回复码 , 表⽰消息⽆法路由的原因 . 通常是⼀个数字代码,每个数字代表不同 的含义 .
        private final int replyCode;
        //⼀个⽂本字符串 , 提供了⽆法路由消息的额外信息或错误描述 .
        private final String replyText;
        //消息被发送到的交换机名称
        private final String exchange;
        //消息的路由键,即发送消息时指定的键
        private final String routingKey;
}
编写生产消息代码(路由正确)
    @RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm", "returns test...", correlationData);return "消息发送成功";}
生产消息1

编写生产消息代码(路由错误)
    @RequestMapping("/returns")public String returns() {CorrelationData correlationData = new CorrelationData("5");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "returns test...", correlationData);return "消息发送成功";}
生产消息2

此时我们可以看到,队列中依旧是只有1条消息,而且代码执行了消息退回,而且消息退回时打印了消息信息,显然我们可以看到,消息的路由规则是错误的,不会入队列。

面试题

如何保证RabbitMQ消息的可靠传输?

从这个图中, 可以看出, 消息可能丢失的场景以及解决⽅案:

1. ⽣产者将消息发送到 RabbitMQ失败
        a. 可能原因: ⽹络问题等
        b. 解决办法: [发送⽅确认-confirm确认模式]
2. 消息在交换机中⽆法路由到指定队列:
        a. 可能原因: 代码或者配置层⾯错误, 导致消息路由失败
        b. 解决办法: [发送⽅确认-return模式]
3. 消息队列⾃⾝数据丢失
        a. 可能原因: 消息到达RabbitMQ之后, RabbitMQ Server 宕机导致消息丢失.
        b. 解决办法: [持久性]. 开启 RabbitMQ持久化, 就是消息写⼊之后会持久化到磁盘, 如果RabbitMQ 挂了, 恢复之后会⾃动读取之前存储的数据. (极端情况下, RabbitMQ还未持久化就挂了, 可能导致少量数据丢失, 这个概率极低, 也可以通过集群的⽅式提⾼可靠性)
4. 消费者异常, 导致消息丢失
        a. 可能原因: 消息到达消费者, 还没来得及消费, 消费者宕机. 消费者逻辑有问题.
        b. 解决办法: [消息确认]. RabbitMQ 提供了 消费者应答机制 来使 RabbitMQ 能够感知到消费者是否消费成功消息. 默认情况下消费者应答机制是⾃动应答的, 可以开启⼿动确认, 当消费者确认消费成功后才会删除消息, 从⽽避免消息丢失. 除此之外, 也可以配置重试机制, 当消息消费异常时, 通过消息重试确保消息的可靠性。

欢迎大家来访问我的主页----》链接


http://www.hkcw.cn/article/YfIJBdbBxW.shtml

相关文章

端午假期重庆共揽客730余万 文旅活动丰富多彩

端午假期,重庆文旅市场活力十足。据重庆市文化和旅游数据中心初步测算,全市接待国内游客730.11万人次,同比增长4.1%,国内游客花费45.13亿元,同比增长9.4%。从景区来看,端午节假日期间,重庆市重点监测的140家景区累计接待游客255.8万人次,同比增长6.3%。洪崖洞风俗风貌区…

国足生死战继续442 出线希望悬于一线

中国男足将于2025年6月5日迎来2026年世界杯亚洲区预选赛18强赛的关键一战,客场挑战印度尼西亚队。这场比赛对于国足来说至关重要,只有胜利才能保留出线希望,输球或平局都将导致淘汰。中国男足25名球员在主教练伊万的带领下于2日晚抵达印尼开始备战。从此前公布的大名单及上海…

民警吃馄饨时助店主挽回30万 休假不忘职责

近日,一面锦旗和一封感谢信送到了云南西双版纳边境管理支队,同事们才知道民警田铁林在休假期间“吃了一碗价值30万的馄饨”。不久前,休假中的田铁林在一家街头馄饨店用餐。店里杂乱的餐桌没人收拾,老板娘玉女士坐在厨房接电话,眉头紧皱,嘴里念叨着数字验证码。田铁林凭借…

【图像处理】基于双目立体匹配的景深计算(Matlab代码实现)​

👨‍🎓个人主页 💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰&a…

VGG16模型:图像处理深度学习的象征

本文还有配套的精品资源,点击获取 简介:VGG16,作为深度学习领域的里程碑,以其独特的16层深度网络架构在2014年ILSVRC中取得突破。该模型主要采用3x3的小型卷积核,通过深层次的卷积层堆叠来提升模型复杂度。VGG16通常…

使用场景广泛存在于各种编程应用当中,比如图像处理中的像素点集合、科学计算里的数值

数组的数据结构 数组是一种线性的数据结构,其中所有的元素都具有相同的数据类型并按照连续的方式存储在内存中。这种特性使得通过索引访问特定位置上的元素变得非常高效。 对于多维数组而言,其本质上是由多个一维数组嵌套而成。例如二维数组可以视为由…

计算机眼中的图像处理基础:灰度化与二值化实验详解

在计算机视觉和图像处理领域,理解计算机如何"看"图像是基础中的基础。本文将详细介绍图像处理中的两个核心概念:灰度化和二值化,并通过实验方法展示不同算法的效果差异。 一、计算机眼中的图像 1. 像素 像素(Pixel)是图像的基本…

OpenCV学习路线全解析!从入门图像处理到计算机视觉实战,搭建你的视觉AI技能体系

想进入计算机视觉领域,OpenCV 几乎是绕不开的第一站。它是一个开源的视觉工具库,拥有丰富的图像处理、识别、追踪等能力,被广泛用于教育、科研和工业场景。 但 OpenCV 功能强大,文档复杂,很多初学者“安装完不知道干啥…

图像处理——卷积

一、什么是卷积 卷积可以理解为两个函数f和g的重叠运算,通过将一个函数翻转并滑动到另一个函数上,计算它们在重叠点的乘积并求和,从而生成一个新的函数。数学上,卷积的表达式通常为: (f∗g)(t)∫−∞∞​f(τ)g(t−τ)…

【图像轮廓特征查找】图像处理(OpenCV) -part8

17 图像轮廓特征查找 图像轮廓特征查找其实就是他的外接轮廓。 应用: 图像分割 形状分析 物体检测与识别 根据轮廓点进行,所以要先找到轮廓。 先灰度化、二值化。目标物体白色,非目标物体黑色,选择合适的儿值化方式。 有了轮…

CImage类在VS2010中的应用与图像处理教程

本文还有配套的精品资源,点击获取 简介:本文旨在介绍如何在Visual Studio 2010中使用MFC库的CImage类进行图像处理。首先概述了CImage类的功能,然后详细讲解了加载、显示、保存图像以及进行裁剪、缩放、旋转等操作的方法。提供了示例代码&a…

【机器学习】图像处理与深度学习利器:OpenCV实战攻略全面解析

🎬 鸽芷咕:个人主页 🔥 个人专栏: 《C干货基地》《粉丝福利》 ⛺️生活的理想,就是为了理想的生活! 前言 OpenCV想必大家都听过跨平台计算机视觉库,可以运行在Linux、Windows、Android和Mac OS操作系统上。它轻量级而…

机器学习中的图像处理与计算机视觉

引言 在现代计算机科学中,图像处理和计算机视觉已成为最活跃的研究领域之一,这得益于机器学习和深度学习的发展。本文将深入探讨图像处理与计算机视觉的基础概念、常见应用、关键技术、常用工具,以及在这些领域中的代码示例。通过本篇文章&a…

Fiji —— 基于 imageJ 的免费且开源的图像处理软件

文章目录 一、Fiji —— 基于 imageJ 的免费且开源的图像处理软件1.1、Fiji工具安装(免费)1.2、Fiji源码下载1.2、Fiji - Plugins插件安装 二、功能详解2.0、Fiji - ImageJ(Web应用程序)2.1、常用功能(汇总&#xff09…

深入了解 OpenCV:C# 开发者的图像处理利器

OpenCV(Open Source Computer Vision Library)是一个开源的计算机视觉与图像处理库,自 2000 年由 Intel 开发以来,已经发展成为业界领先的图像处理框架之一。凭借其跨平台特性、丰富的功能集以及活跃的社区支持,OpenCV…

巴黎世家平角短裤造型裙子已缺货 时尚争议再起

近日,奢侈品牌巴黎世家推出的一款售价4500元的女款半身裙在网上引发热议。不少网友吐槽该裙子造型与平角短裤极为相似,直呼“看不懂时尚”。据巴黎世家官网介绍,这款深蓝色弹力平纹针织半身裙亮相于2025秋季系列Look 50和Look 54。裙子采用弹力棉混纺平纹针织面料,设计为平…

日本男子杀害妻子幼女后自杀 家庭悲剧震惊邻里

6月2日上午,日本大阪府吹田市发生一起悲剧。一名26岁男子从世博会馆附近的一座天桥上跳下自杀。警方随后在其家中发现了他妻子和两名幼女的尸体,三人腹部血流不止,已经死亡。现场还发现了一把带血的菜刀和一张字条,字条上写着对女儿和妻子的歉意。据警方透露,当天早上6点2…

女婴术后脑损伤疑撞到床栏 家属求真相艰难

近日,四川的徐女士反映,她五个多月大的孩子鱼鱼在四川大学华西第二医院锦江院区做完心脏手术后,头部出现了一个创口。经检查,鱼鱼被诊断为脑出血和脑损伤,并伴有癫痫。当地卫健委介入调查后未能得出明确结论。5月29日,记者在事发医院见到已经一岁多的鱼鱼,她仍旧不会爬行…

A股六月开门红 三大指数集体反弹

端午节三天小长假后,六月首个交易日A股三大指数集体反弹,给股民朋友们带来了一个“开门红”。这是继五月“开门红”之后,A股再度取得月度良好开局。截至收盘,沪指涨0.43%,深成指涨0.16%,创业板指涨0.48%,北证50指数涨1.03%。全市场成交额达到11638亿元,较上日放量4亿元…

端午节假期国内出游1.19亿人次 文旅融合添彩传统节日

文化和旅游部6月3日公布了2025年端午节假期文化和旅游市场的情况。经测算,假期三天内,全国国内出游人数达到1.19亿人次,同比增长5.7%;国内出游总花费为427.30亿元,同比增长5.9%。在假期期间,群众积极参与赛龙舟、吃粽子、唱山歌和赏古曲等活动,传统节日文化内涵与旅游发…