RocketMQ 消息发送核心源码解析:DefaultMQProducerImpl.send () 方法深度剖析

article/2025/6/30 20:24:50

引言

在分布式系统中,消息队列是实现异步通信、服务解耦和流量削峰的关键组件。Apache RocketMQ 作为一款高性能、高可靠的消息中间件,被广泛应用于各类互联网场景。其中,消息发送是最基础也是最重要的功能之一。本文将深入剖析 RocketMQ 中 DefaultMQProducerImpl.send() 方法的实现原理,带你了解消息发送的核心流程和关键技术点。

一、DefaultMQProducerImpl 概述

DefaultMQProducerImpl 是 RocketMQ 消息生产者的核心实现类,它实现了消息发送的具体逻辑。其类层次结构如下:

DefaultMQProducerImpl├── 实现了 MQProducerInner 接口├── 依赖于 MQClientInstance 进行网络通信├── 包含 TopicPublishInfoManager 管理主题路由信息├── 使用 SendMessageHookList 支持发送钩子扩展

 核心属性

 private final InternalLogger log = ClientLogger.getLog();/*** 随机数*/private final Random random = new Random();/*** 关联的消息生产者的组件*/private final DefaultMQProducer defaultMQProducer;/*** topic 发布信息的映射表*/private final ConcurrentMap<String/* topic */, TopicPublishInfo> topicPublishInfoTable =new ConcurrentHashMap<String, TopicPublishInfo>();/*** 发送消息的钩子List**/private final ArrayList<SendMessageHook> sendMessageHookList = new ArrayList<SendMessageHook>();/*** 完成事务消息的钩子list*/private final ArrayList<EndTransactionHook> endTransactionHookList = new ArrayList<EndTransactionHook>();/*** rpc钩子*/private final RPCHook rpcHook;/*** 异步发送线程池队列*/private final BlockingQueue<Runnable> asyncSenderThreadPoolQueue;/*** 默认异步发送消息的线程池*/private final ExecutorService defaultAsyncSenderExecutor;/*** 检查请求的队列*/protected BlockingQueue<Runnable> checkRequestQueue;/*** 检查请求的线程池*/protected ExecutorService checkExecutor;/*** 服务的状态*/private ServiceState serviceState = ServiceState.CREATE_JUST;/*** 客户端的实例*/private MQClientInstance mQClientFactory;/*** 检查禁用钩子的list*/private ArrayList<CheckForbiddenHook> checkForbiddenHookList = new ArrayList<CheckForbiddenHook>();/*** 容错策略*/private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();/*** 异步化发送消息的线程池*/private ExecutorService asyncSenderExecutor;

二、send () 方法的核心流程

2.1 方法签名与重载

DefaultMQProducerImpl 提供了多个重载的 send() 方法,支持同步、异步和单向发送模式:

// 同步发送
public SendResult send(Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;// 异步发送
public void send(Message msg, SendCallback sendCallback) throws MQClientException, RemotingException, InterruptedException;// 单向发送
public void sendOneway(Message msg) throws MQClientException, RemotingException, InterruptedException;// 带超时参数的同步发送
public SendResult send(Message msg, long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;// 带队列选择器的发送
public SendResult send(Message msg, MessageQueueSelector selector, Object arg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException;

2.2 核心发送流程

所有 send() 方法最终都会调用 sendDefaultImpl() 方法,这是消息发送的核心实现:

    private SendResult sendDefaultImpl(Message msg,final CommunicationMode communicationMode,final SendCallback sendCallback,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {this.makeSureStateOK();Validators.checkMessage(msg, this.defaultMQProducer);final long invokeID = random.nextLong();long beginTimestampFirst = System.currentTimeMillis();long beginTimestampPrev = beginTimestampFirst;long endTimestamp = beginTimestampFirst;//根据topic进行查找topic的路由信息TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());if (topicPublishInfo != null && topicPublishInfo.ok()) {boolean callTimeout = false;MessageQueue mq = null;Exception exception = null;SendResult sendResult = null;//设置总的重试次数int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;int times = 0;String[] brokersSent = new String[timesTotal];for (; times < timesTotal; times++) {String lastBrokerName = null == mq ? null : mq.getBrokerName();//进行选择一个消息队列MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);//选择出来的队列不为nullif (mqSelected != null) {mq = mqSelected;brokersSent[times] = mq.getBrokerName();try {beginTimestampPrev = System.currentTimeMillis();if (times > 0) {//Reset topic with namespace during resend.msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));}long costTime = beginTimestampPrev - beginTimestampFirst;if (timeout < costTime) {callTimeout = true;break;}//走一个网络通信逻辑sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);switch (communicationMode) {case ASYNC:return null;case ONEWAY:return null;case SYNC:if (sendResult.getSendStatus() != SendStatus.SEND_OK) {if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {continue;}}return sendResult;default:break;}} catch (RemotingException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQClientException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;continue;} catch (MQBrokerException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());exception = e;if (this.defaultMQProducer.getRetryResponseCodes().contains(e.getResponseCode())) {continue;} else {if (sendResult != null) {return sendResult;}throw e;}} catch (InterruptedException e) {endTimestamp = System.currentTimeMillis();this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);log.warn(msg.toString());throw e;}} else {break;}}if (sendResult != null) {return sendResult;}String info = String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s",times,System.currentTimeMillis() - beginTimestampFirst,msg.getTopic(),Arrays.toString(brokersSent));info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);MQClientException mqClientException = new MQClientException(info, exception);if (callTimeout) {throw new RemotingTooMuchRequestException("sendDefaultImpl call timeout");}if (exception instanceof MQBrokerException) {mqClientException.setResponseCode(((MQBrokerException) exception).getResponseCode());} else if (exception instanceof RemotingConnectException) {mqClientException.setResponseCode(ClientErrorCode.CONNECT_BROKER_EXCEPTION);} else if (exception instanceof RemotingTimeoutException) {mqClientException.setResponseCode(ClientErrorCode.ACCESS_BROKER_TIMEOUT);} else if (exception instanceof MQClientException) {mqClientException.setResponseCode(ClientErrorCode.BROKER_NOT_EXIST_EXCEPTION);}throw mqClientException;}validateNameServerSetting();throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);}

三、关键步骤详解

3.1 获取主题路由信息

 private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {//从本地的Map中进行获取Topic的路由信息TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);//本地的Map中没有这个Topic的路由信息,那么就进行从NameServer中获取路由信息if (null == topicPublishInfo || !topicPublishInfo.ok()) {this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());// 从远程的NameServer中获取路由信息this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);topicPublishInfo = this.topicPublishInfoTable.get(topic);}//topic的路由信息是可用的,那么就直接返回这个topicPublishInfoif (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {return topicPublishInfo;} else {//从远程的NameServer中获取路由信息 在进行返回this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);topicPublishInfo = this.topicPublishInfoTable.get(topic);return topicPublishInfo;}}

3.2 消息队列选择

/*** 选择一个可用的队列* @param tpInfo topic的路由信息* @param lastBrokerName 上一次发送时候的broker* @return*/public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {if (this.sendLatencyFaultEnable) {try {//对这个queue进行累加一下indexint index = tpInfo.getSendWhichQueue().incrementAndGet();//遍历topic里的每个consumeQueuefor (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {//针对index 进行取模int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();if (pos < 0)pos = 0;//轮询选择一个messageQueueMessageQueue mq = tpInfo.getMessageQueueList().get(pos);//判断这个broker是否可用if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))return mq;}//遍历完之后,发现没有可用的broker 至少选择一个brokerfinal String notBestBroker = latencyFaultTolerance.pickOneAtLeast();int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);if (writeQueueNums > 0) {final MessageQueue mq = tpInfo.selectOneMessageQueue();if (notBestBroker != null) {return new MessageQueue(mq.getTopic(), notBestBroker, tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);} else {return mq;}} else {latencyFaultTolerance.remove(notBestBroker);}} catch (Exception e) {log.error("Error occurred when selecting message queue", e);}return tpInfo.selectOneMessageQueue();}return tpInfo.selectOneMessageQueue(lastBrokerName);}

3.3 底层消息发送

private SendResult sendKernelImpl(final Message msg,final MessageQueue mq,final CommunicationMode communicationMode,final SendCallback sendCallback,final TopicPublishInfo topicPublishInfo,final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {long beginStartTime = System.currentTimeMillis();//获取broker地址String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());if (null == brokerAddr) {tryToFindTopicPublishInfo(mq.getTopic());brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());}SendMessageContext context = null;if (brokerAddr != null) {brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);byte[] prevBody = msg.getBody();try {//for MessageBatch,ID has been set in the generating processif (!(msg instanceof MessageBatch)) {MessageClientIDSetter.setUniqID(msg);}boolean topicWithNamespace = false;if (null != this.mQClientFactory.getClientConfig().getNamespace()) {msg.setInstanceId(this.mQClientFactory.getClientConfig().getNamespace());topicWithNamespace = true;}int sysFlag = 0;boolean msgBodyCompressed = false;if (this.tryToCompressMessage(msg)) {sysFlag |= MessageSysFlag.COMPRESSED_FLAG;sysFlag |= compressType.getCompressionFlag();msgBodyCompressed = true;}final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (Boolean.parseBoolean(tranMsg)) {sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;}if (hasCheckForbiddenHook()) {CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());checkForbiddenContext.setCommunicationMode(communicationMode);checkForbiddenContext.setBrokerAddr(brokerAddr);checkForbiddenContext.setMessage(msg);checkForbiddenContext.setMq(mq);checkForbiddenContext.setUnitMode(this.isUnitMode());this.executeCheckForbiddenHook(checkForbiddenContext);}if (this.hasSendMessageHook()) {context = new SendMessageContext();context.setProducer(this);context.setProducerGroup(this.defaultMQProducer.getProducerGroup());context.setCommunicationMode(communicationMode);context.setBornHost(this.defaultMQProducer.getClientIP());context.setBrokerAddr(brokerAddr);context.setMessage(msg);context.setMq(mq);context.setNamespace(this.defaultMQProducer.getNamespace());String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);if (isTrans != null && isTrans.equals("true")) {context.setMsgType(MessageType.Trans_Msg_Half);}if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {context.setMsgType(MessageType.Delay_Msg);}this.executeSendMessageHookBefore(context);}//发送消息的请求头SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());requestHeader.setTopic(msg.getTopic());requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());requestHeader.setQueueId(mq.getQueueId());requestHeader.setSysFlag(sysFlag);requestHeader.setBornTimestamp(System.currentTimeMillis());requestHeader.setFlag(msg.getFlag());requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));requestHeader.setReconsumeTimes(0);requestHeader.setUnitMode(this.isUnitMode());requestHeader.setBatch(msg instanceof MessageBatch);requestHeader.setBname(mq.getBrokerName());if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);if (reconsumeTimes != null) {requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);}String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);if (maxReconsumeTimes != null) {requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);}}SendResult sendResult = null;switch (communicationMode) {case ASYNC:Message tmpMessage = msg;boolean messageCloned = false;if (msgBodyCompressed) {//If msg body was compressed, msgbody should be reset using prevBody.//Clone new message using commpressed message body and recover origin massage.//Fix bug:https://github.com/apache/rocketmq-externals/issues/66tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;msg.setBody(prevBody);}if (topicWithNamespace) {if (!messageCloned) {tmpMessage = MessageAccessor.cloneMessage(msg);messageCloned = true;}msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}long costTimeAsync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeAsync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),tmpMessage,requestHeader,timeout - costTimeAsync,communicationMode,sendCallback,topicPublishInfo,this.mQClientFactory,this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),context,this);break;case ONEWAY:case SYNC: //同步发送long costTimeSync = System.currentTimeMillis() - beginStartTime;if (timeout < costTimeSync) {throw new RemotingTooMuchRequestException("sendKernelImpl call timeout");}//同步发送消息sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(brokerAddr,mq.getBrokerName(),msg,requestHeader,timeout - costTimeSync,communicationMode,context,this);break;default:assert false;break;}if (this.hasSendMessageHook()) {context.setSendResult(sendResult);this.executeSendMessageHookAfter(context);}return sendResult;} catch (RemotingException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (MQBrokerException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} catch (InterruptedException e) {if (this.hasSendMessageHook()) {context.setException(e);this.executeSendMessageHookAfter(context);}throw e;} finally {msg.setBody(prevBody);msg.setTopic(NamespaceUtil.withoutNamespace(msg.getTopic(), this.defaultMQProducer.getNamespace()));}}throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);}

3.4 调用NettyClient发送消息

 public RemotingCommand invokeSync(String addr, final RemotingCommand request, long timeoutMillis)throws InterruptedException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException {//获取一个当前的时间戳long beginStartTime = System.currentTimeMillis();//这里获取一个channel 这个channel 就是Broker跟NameServer之间的连接final Channel channel = this.getAndCreateChannel(addr);//如果连接存在并且连接是活跃的 那么就可以发送请求if (channel != null && channel.isActive()) {try {//执行发送前的一些操作 先不关心doBeforeRpcHooks(addr, request);long costTime = System.currentTimeMillis() - beginStartTime;if (timeoutMillis < costTime) {throw new RemotingTimeoutException("invokeSync call the addr[" + addr + "] timeout");}//这里就是真正发送网络请求的地方RemotingCommand response = this.invokeSyncImpl(channel, request, timeoutMillis - costTime);//执行完成后的一些操作 先不关心doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(channel), request, response);return response;} catch (RemotingSendRequestException e) {log.warn("invokeSync: send request exception, so close the channel[{}]", addr);this.closeChannel(addr, channel);throw e;} catch (RemotingTimeoutException e) {if (nettyClientConfig.isClientCloseSocketIfTimeout()) {this.closeChannel(addr, channel);log.warn("invokeSync: close socket because of timeout, {}ms, {}", timeoutMillis, addr);}log.warn("invokeSync: wait response timeout exception, the channel[{}]", addr);throw e;}} else {this.closeChannel(addr, channel);throw new RemotingConnectException(addr);}}
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request,final long timeoutMillis)throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException {final int opaque = request.getOpaque();try {//初始化响应的future对象final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null);this.responseTable.put(opaque, responseFuture);//获取请求的网络连接地址final SocketAddress addr = channel.remoteAddress();channel.writeAndFlush(request).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture f) throws Exception {if (f.isSuccess()) {responseFuture.setSendRequestOK(true);return;} else {responseFuture.setSendRequestOK(false);}responseTable.remove(opaque);responseFuture.setCause(f.cause());responseFuture.putResponse(null);log.warn("send a request command to channel <" + addr + "> failed.");}});//等待响应 使用countDownLatch进行同步的等待RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis);//没有返回结果if (null == responseCommand) {if (responseFuture.isSendRequestOK()) {//请求超时了 发送成功了throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis,responseFuture.getCause());} else {//请求压根就没有发送成功throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause());}}//如果响应成功return responseCommand;} finally {//最终一定会把响应从table中删除this.responseTable.remove(opaque);}}

 3.5 BrokerController接收请求

        客户端发送消息给BrokerController,BrokerController会进行调用SendMessageProcessor进行处理发送过来的消息。

    public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final SendMessageContext mqtraceContext;switch (request.getCode()) {case RequestCode.CONSUMER_SEND_MSG_BACK:return this.asyncConsumerSendMsgBack(ctx, request);default://解析出来一个发送消息请求的headerSendMessageRequestHeader requestHeader = parseRequestHeader(request);if (requestHeader == null) {return CompletableFuture.completedFuture(null);}//构建一个消息的上下文mqtraceContext = buildMsgContext(ctx, requestHeader);this.executeSendMessageHookBefore(ctx, request, mqtraceContext);if (requestHeader.isBatch()) {return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);} else {return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);}}}

  处理单个消息的逻辑:asyncSendMessage,调用底层的存储逻辑进行异步增加消息。

this.brokerController.getMessageStore().asyncPutMessage(msgInner);

最终会进行调用 CommitLog#asyncPutMessage的方法,根据决定是调用CommitLog#asyncPutMessage还是调用 DLedgerCommitLog#asyncPutMessage方法。如果是调用 DLedgerCommitLog#asyncPutMessage方法,最终会进行调用DLedgerServer#handleAppend。

保存完消息之后,ReputMessageService线程会调用CommitLogDispatcherBuildConsumeQueue(向ConsumeQueue中添加数据)和CommitLogDispatcherBuildIndex(向索引中添加数据) 这两个分发组件。

四、性能优化与最佳实践

4.1 性能优化建议

  1. 批量发送:对于小消息,使用批量发送提高吞吐量

List<Message> messages = new ArrayList<>();
messages.add(new Message("TopicTest", "TagA", "Message 1".getBytes()));
messages.add(new Message("TopicTest", "TagA", "Message 2".getBytes()));
producer.send(messages);

   2.异步发送:对响应时间敏感的场景使用异步模式

producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理成功响应}@Overridepublic void onException(Throwable e) {// 处理异常}
});

3.合理配置重试次数:根据业务需求调整 retryTimesWhenSendFailed

4.启用故障延迟机制:避免向故障 Broker 发送消息

4.2 监控与告警

  1. 发送成功率:监控 SendStatus.SEND_OK 的比例

  2. 响应时间:监控消息发送的平均响应时间

  3. 异常日志:关注 sendDefaultImpl 方法中的异常捕获

  4. Broker 状态:监控 Broker 的可用性和负载情况


http://www.hkcw.cn/article/VdNmRTPetH.shtml

相关文章

计算机视觉NeRF

NeRF与3DGS学习 NeRF计算机视觉的问题NeRF定义神经辐射场场景表示基于辐射场的体渲染分层采样优化神经辐射场 基础知识初始化SFM基础矩阵 & 本质矩阵 & 单应矩阵从已经估得的本质矩阵E&#xff0c;恢复出相机的运动R,tSVD 分解 NeRF NeRF资源 计算机视觉的问题 计算…

分班 - 华为OD统一考试(JavaScript 题解)

华为OD机试题库《C》限时优惠 9.9 华为OD机试题库《Python》限时优惠 9.9 华为OD机试题库《JavaScript》限时优惠 9.9 针对刷题难&#xff0c;效率慢&#xff0c;我们提供一对一算法辅导&#xff0c; 针对个人情况定制化的提高计划&#xff08;全称1V1效率更高&#xff09;。 看…

CVE-2021-28169源码分析与漏洞复现(Jetty信息泄露)

漏洞概述 漏洞名称&#xff1a;Jetty ConcatServlet 多重解码导致 WEB-INF 敏感信息泄露 漏洞编号&#xff1a;CVE-2021-28169 CVSS 评分&#xff1a;7.5 影响版本&#xff1a; Jetty 9.4.0 - 9.4.39Jetty 10.0.0 - 10.0.1Jetty 11.0.0 - 11.0.1 修复版本&#xff1a;Jetty ≥…

CLion调试无法触发断点

CLion 调试时执行的是cmake-build-release目录中的exe&#xff0c;无法触发断点 这里配置要选择Debug&#xff0c;不要选择Release

2024年数维杯国际大学生数学建模挑战赛C题时间信号脉冲定时噪声抑制与大气时延抑制模型解题全过程论文及程序

2024年数维杯国际大学生数学建模挑战赛 C题 时间信号脉冲定时噪声抑制与大气时延抑制模型 原题再现&#xff1a; 脉冲星是一种快速旋转的中子星&#xff0c;具有连续稳定的旋转&#xff0c;因此被称为“宇宙灯塔”。脉冲星的空间观测在深空航天器导航和时间标准维护中发挥着至…

50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | Sound Board(音响控制面板)

&#x1f4c5; 我们继续 50 个小项目挑战&#xff01;—— SoundBoard 组件 仓库地址&#xff1a;https://github.com/SunACong/50-vue-projects 项目预览地址&#xff1a;https://50-vue-projects.vercel.app/ &#x1f3af; 组件目标 实现一个响应式按钮面板&#xff0c;点…

【目标检测数据集】电动车驾驶员戴头盔相关数据集

一、TWHD 数据集 介绍 随着国家惩治行车不戴头盔违法行为的力度不断加大&#xff0c;双轮车&#xff08;电动车与摩托车&#xff09;头盔检测任务也越来越重要。双轮车佩戴头盔检测数据集&#xff08;two wheeler helmet dataset&#xff0c;TWHD&#xff09;收集了来自开源数…

【机器学习基础】机器学习入门核心:数学基础与Python科学计算库

机器学习入门核心&#xff1a;数学基础与Python科学计算库 一、核心数学基础回顾1. 函数与导数2. Taylor公式3. 概率论基础4. 统计量5. 重要定理6. 最大似然估计&#xff08;MLE&#xff09;7. 线性代数 二、Python科学计算库精要1. NumPy&#xff1a;数值计算核心2. SciPy&…

【存储基础】SAN存储基础知识

文章目录 1. 什么是SAN存储&#xff1f;2. SAN存储组网架构3. SAN存储的主要协议SCSI光纤通道&#xff08;FC&#xff09;协议iSCSIFCoENVMe-oFIB 4. SAN存储的关键技术Thin Provision&#xff1a;LUN空间按需分配Tier&#xff1a;分级存储Cache&#xff1a;缓存机制QoS&#x…

缓解颈部不适的营养补给之道

对于颈部常有不适的人群而言&#xff0c;合理的营养补充是维持身体良好状态的重要方式。日常饮食中&#xff0c;蛋白质是不容小觑的营养元素。瘦肉、蛋类、奶类以及豆制品都是优质蛋白质的来源&#xff0c;它们能够帮助增强肌肉力量&#xff0c;为颈部提供更好的支撑。​ 维生…

ck-editor5的研究 (6):进一步优化页面刷新时,保存提示的逻辑

文章目录 一、前言二、实现步骤1. 第一步: 引入 PendingActions 插件2. 第二步&#xff1a;注册事件3. 第三步&#xff1a;点击保存按钮时&#xff0c;控制状态变化 三、测试效果和细节四、总结 一、前言 在上一篇文章中 ck-editor5的研究 (5)&#xff1a;优化-页面离开时提醒…

手机归属地查询接口如何用Java调用?

一、什么是手机归属地查询接口&#xff1f; 是一种便捷、高效的工具&#xff0c;操作简单&#xff0c;请求速度快。它不仅能够提高用户填写地址的效率&#xff0c;还能帮助企业更好地了解客户需求&#xff0c;制定个性化的营销策略&#xff0c;降低风险。随着移动互联网的发展…

列表推导式(Python)

[表达式 for 变量 in 列表] 注意&#xff1a;in后面不仅可以放列表&#xff0c;还可以放range ()可迭代对象 [表达式 for 变量 in 列表 if 条件]

【机器学习|评价指标4】正预测值(PPV)、负预测值(NPV)、假阴性率(FNR)、假阳性率(FPR)详解,附代码。

【机器学习|评价指标4】正预测值&#xff08;PPV&#xff09;、负预测值&#xff08;NPV&#xff09;、假阴性率&#xff08;FNR&#xff09;、假阳性率&#xff08;FPR&#xff09;详解&#xff0c;附代码。 【机器学习|评价指标4】正预测值&#xff08;PPV&#xff09;、负预…

【Delphi】实现在多显示器时指定程序运行在某个显示器上

在多显示器时代&#xff0c;经常会出现期望将程序运行在某个指定的显示器上&#xff0c;特别是在调试程序的时候&#xff0c;期望切换分辨率&#xff0c;单步调试时&#xff0c;此时容易导致互相卡住&#xff0c;非常不方便&#xff0c;但是通过指定程序运行在不同的显示器上就…

渗透实战PortSwigger Labs AngularJS DOM XSS利用详解

本Lab学习到关于AngularJS的 xss 漏洞利用 直接输入回显页面&#xff0c;但是把<>进了 html 编码了 当我们输入{{11}}&#xff0c;没有当作字符处理&#xff0c;而是执行了 {{}} 是多种前端框架&#xff08;如 Vue、Angular、Django 模板等&#xff09;中常见的模板插值语…

配置刷新技术

FPGA 片上三模冗余( TMR) 设计结合配置刷新( Scrubbing) 的防护方法能够有效地提高系统的抗单粒子翻转性能。 三模冗余的方法利用模块三冗余及三取二自动表决来掩蔽错误&#xff0c;但是如果错误积累到一定程度&#xff0c;导致同时有两个或两个以上模块发生翻转错误&#xff0…

计算机科技笔记: 容错计算机设计05 n模冗余系统 其他复杂结构

目录 NMR变体动态冗余系统混合冗余系统筛除新系统 NMR变体 V是表决器 动态冗余系统 优点像N模并行系统&#xff0c;后边加一个故障检测和系统重构百分之90以上的故障都是瞬时故障&#xff0c;检测到故障重新运行即可如果出现老化&#xff0c;可以用Spare-1替代 混合冗余系…

HealthBench医疗AI评估基准:技术路径与核心价值深度分析(上)

引言:医疗AI评估的新范式 在人工智能技术迅猛发展的当下,医疗AI系统已逐渐从实验室走向临床应用。然而,医疗领域的特殊性要求这些系统不仅需要在技术指标上表现出色,更需要在实际临床场景中展现出可靠、安全且有效的性能。长期以来,医疗AI评估领域面临着三个核心挑战:评…

中国就业人口现状分析与未来趋势预测

目录 1、核心摘要 2、就业人口总量与趋势 就业人口规模 产业结构变化 3、未来就业趋势 2030年就业变革 人口结构影响 技能需求变化 4、年龄结构与老龄化影响 老龄化现状 抚养比变化 6、老龄化经济影响 消费结构变化 创新活力 7、行业分布与数字经济 行业就业结…