mongodb源码分析session接受客户端find命令过程

article/2025/9/4 9:34:01

mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制。

现在继续研究ASIOSession和connection是怎么接受客户端命令的?

mongo/transport/service_state_machine.cpp核心方法有:

enum class State {Created,     // The session has been created, but no operations have been performed yetSource,      // Request a new Message from the network to handleSourceWait,  // Wait for the new Message to arrive from the networkProcess,     // Run the Message through the databaseSinkWait,    // Wait for the database result to be sent by the networkEndSession,  // End the session - the ServiceStateMachine will be invalid after thisEnded        // The session has ended. It is illegal to call any method besides// state() if this is the current state.};void _processMessage()
void _sinkCallback()
void _sourceCallback()
void _sourceMessage()
void _sinkMessage()
void _runNextInGuard()

mongo第一条命令状态转变流程是:State::Created 》 State::Source 》State::SourceWait 》 State::Process 》 State::SinkWait  》 State::Source 》State::SourceWait

状态解释:State::Created,     //session刚刚创建,但是还没有接受任何命令
            State::Source,      //去接受客户端新的命令
            State::SourceWait,  // 等待客户端新的命令
            State::Process,     // 将接受的命令发送给mongodb数据库
           State:: SinkWait,    // 等待将命令的执行结果返回给客户端

mongo/transport/service_state_machine.cpp核心方法循环调用的流程图下:

mongo/transport/service_state_machine.cpp核心_runNextInGuard方法主要判断状态State::Source和State::Process,State::Source主要session等待客户端请求find命令State::Process将命令发送给mongodb数据库, _runNextInGuard代码如下:

void ServiceStateMachine::_runNextInGuard(ThreadGuard guard) {auto curState = state();dassert(curState != State::Ended);// If this is the first run of the SSM, then update its state to Sourceif (curState == State::Created) {curState = State::Source;_state.store(curState);}// Make sure the current Client got set correctlydassert(Client::getCurrent() == _dbClientPtr);try {switch (curState) {case State::Source:LOG(1) << "conca _runNextInGuard  State::Source" ;_sourceMessage(std::move(guard));break;case State::Process:LOG(1) << "conca _runNextInGuard  State::Process" ;_processMessage(std::move(guard));break;case State::EndSession:LOG(1) << "conca _runNextInGuard  State::EndSession" ;_cleanupSession(std::move(guard));break;default:MONGO_UNREACHABLE;}return;} catch (const DBException& e) {log() << "DBException handling request, closing client connection: " << redact(e);}if (!guard) {guard = ThreadGuard(this);}_state.store(State::EndSession);_cleanupSession(std::move(guard));
}

session等待connection请求,状态转变流程:State::Created 》 State::Source 》State::SourceWait,

void ServiceStateMachine::_sourceMessage(ThreadGuard guard) {invariant(_inMessage.empty());invariant(_state.load() == State::Source);LOG(1) << "conca _sourceMessage State::Source";_state.store(State::SourceWait);LOG(1) << "conca _sourceMessage store State::SourceWait";guard.release();auto sourceMsgImpl = [&] {if (_transportMode == transport::Mode::kSynchronous) {MONGO_IDLE_THREAD_BLOCK;return Future<Message>::makeReady(_session()->sourceMessage());} else {invariant(_transportMode == transport::Mode::kAsynchronous);return _session()->asyncSourceMessage();}};sourceMsgImpl().getAsync([this](StatusWith<Message> msg) {if (msg.isOK()) {_inMessage = std::move(msg.getValue());invariant(!_inMessage.empty());}_sourceCallback(msg.getStatus());});
}

 _sourceMessage主要处理状态State::Source 》State::SourceWait,等待session接受消息。_session()->asyncSourceMessage()方法session异步等待客户端发送的find命令消息。

如果有find命令到来则调用_sourceCallback(msg.getStatus());_sourceCallback方法代码如下:

void ServiceStateMachine::_sourceCallback(Status status) {// The first thing to do is create a ThreadGuard which will take ownership of the SSM in this// thread.ThreadGuard guard(this);// Make sure we just called sourceMessage();LOG(1) << "conca _sinkMessage State::SinkWait";dassert(state() == State::SourceWait);auto remote = _session()->remote();if (status.isOK()) {_state.store(State::Process);LOG(1) << "conca _sinkMessage store State::Process";// Since we know that we're going to process a message, call scheduleNext() immediately// to schedule the call to processMessage() on the serviceExecutor (or just unwind the// stack)// If this callback doesn't own the ThreadGuard, then we're being called recursively,// and the executor shouldn't start a new thread to process the message - it can use this// one just after this returns.return _scheduleNextWithGuard(std::move(guard),ServiceExecutor::kMayRecurse,transport::ServiceExecutorTaskName::kSSMProcessMessage);} else if (ErrorCodes::isInterruption(status.code()) ||ErrorCodes::isNetworkError(status.code())) {LOG(1) << "Session from " << remote<< " encountered a network error during SourceMessage: " << status;_state.store(State::EndSession);} else if (status == TransportLayer::TicketSessionClosedStatus) {// Our session may have been closed internally.LOG(1) << "Session from " << remote << " was closed internally during SourceMessage";_state.store(State::EndSession);} else {log() << "Error receiving request from client: " << status << ". Ending connection from "<< remote << " (connection id: " << _session()->id() << ")";_state.store(State::EndSession);}// There was an error receiving a message from the client and we've already printed the error// so call runNextInGuard() to clean up the session without waiting._runNextInGuard(std::move(guard));
}

收到find命令转给mongodb执行find命令,状态转变:State::SourceWait》 State::Process,继续调用_scheduleNextWithGuard 》 schedule 调度 》 _runNextInGuard(上面已经存在,反复调用这个方法)。

现在_runNextInGuard方法里面状态State::Process,所以继续调用的方法是_processMessage执行mongodb数据库命令,接受mongodb数据库返回的数据,_processMessage详细代码如下:

void ServiceStateMachine::_processMessage(ThreadGuard guard) {invariant(!_inMessage.empty());LOG(1) << "conca _processMessage";TrafficRecorder::get(_serviceContext).observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), _inMessage);auto& compressorMgr = MessageCompressorManager::forSession(_session());_compressorId = boost::none;if (_inMessage.operation() == dbCompressed) {MessageCompressorId compressorId;auto swm = compressorMgr.decompressMessage(_inMessage, &compressorId);uassertStatusOK(swm.getStatus());_inMessage = swm.getValue();_compressorId = compressorId;}networkCounter.hitLogicalIn(_inMessage.size());// Pass sourced Message to handler to generate response.auto opCtx = Client::getCurrent()->makeOperationContext();// The handleRequest is implemented in a subclass for mongod/mongos and actually all the// database work for this request.DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage);// opCtx must be destroyed here so that the operation cannot show// up in currentOp results after the response reaches the clientopCtx.reset();// Format our response, if we have oneMessage& toSink = dbresponse.response;if (!toSink.empty()) {invariant(!OpMsg::isFlagSet(_inMessage, OpMsg::kMoreToCome));invariant(!OpMsg::isFlagSet(toSink, OpMsg::kChecksumPresent));// Update the header for the response message.toSink.header().setId(nextMessageId());toSink.header().setResponseToMsgId(_inMessage.header().getId());if (OpMsg::isFlagSet(_inMessage, OpMsg::kChecksumPresent)) {
#ifdef MONGO_CONFIG_SSLif (!SSLPeerInfo::forSession(_session()).isTLS) {OpMsg::appendChecksum(&toSink);}
#elseOpMsg::appendChecksum(&toSink);
#endif}// If the incoming message has the exhaust flag set and is a 'getMore' command, then we// bypass the normal RPC behavior. We will sink the response to the network, but we also// synthesize a new 'getMore' request, as if we sourced a new message from the network. This// new request is sent to the database once again to be processed. This cycle repeats as// long as the associated cursor is not exhausted. Once it is exhausted, we will send a// final response, terminating the exhaust stream._inMessage = makeExhaustMessage(_inMessage, &dbresponse);_inExhaust = !_inMessage.empty();networkCounter.hitLogicalOut(toSink.size());if (_compressorId) {auto swm = compressorMgr.compressMessage(toSink, &_compressorId.value());uassertStatusOK(swm.getStatus());toSink = swm.getValue();}TrafficRecorder::get(_serviceContext).observe(_sessionHandle, _serviceContext->getPreciseClockSource()->now(), toSink);_sinkMessage(std::move(guard), std::move(toSink));LOG(1) << "conca _processMessage _sinkMessage";} else {_state.store(State::Source);_inMessage.reset();LOG(1) << "conca _processMessage store(State::Source)";return _scheduleNextWithGuard(std::move(guard),ServiceExecutor::kDeferredTask,transport::ServiceExecutorTaskName::kSSMSourceMessage);}
}

 DbResponse dbresponse = _sep->handleRequest(opCtx.get(), _inMessage)将接受的find命令发送给mongodb数据库,mongodb数据库执行逻辑,返回响应结果。

最后调用_sinkMessage,将mongodb结果数据发送给客户端。状态转变流程:State::Process 》 State::SinkWait

void ServiceStateMachine::_sinkMessage(ThreadGuard guard, Message toSink) {// Sink our response to the clientinvariant(_state.load() == State::Process);LOG(1) << "conca _sinkMessage State::Source";_state.store(State::SinkWait);LOG(1) << "conca _sinkMessage store State::SinkWait";guard.release();auto sinkMsgImpl = [&] {if (_transportMode == transport::Mode::kSynchronous) {// We don't consider ourselves idle while sending the reply since we are still doing// work on behalf of the client. Contrast that with sourceMessage() where we are waiting// for the client to send us more work to do.return Future<void>::makeReady(_session()->sinkMessage(std::move(toSink)));} else {invariant(_transportMode == transport::Mode::kAsynchronous);return _session()->asyncSinkMessage(std::move(toSink));}};sinkMsgImpl().getAsync([this](Status status) { _sinkCallback(std::move(status)); });
}

_session()->asyncSinkMessage(std::move(toSink))发送结果给客户端,成功之后继续调用_sinkCallback。

void ServiceStateMachine::_sinkCallback(Status status) {// The first thing to do is create a ThreadGuard which will take ownership of the SSM in this// thread.ThreadGuard guard(this);LOG(1) << "conca _sinkCallback State::SinkWait";dassert(state() == State::SinkWait);// If there was an error sinking the message to the client, then we should print an error and// end the session. No need to unwind the stack, so this will runNextInGuard() and return.//// Otherwise, update the current state depending on whether we're in exhaust or not, and call// scheduleNext() to unwind the stack and do the next step.if (!status.isOK()) {log() << "Error sending response to client: " << status << ". Ending connection from "<< _session()->remote() << " (connection id: " << _session()->id() << ")";_state.store(State::EndSession);return _runNextInGuard(std::move(guard));} else if (_inExhaust) {_state.store(State::Process);LOG(1) << "conca _sinkCallback store State::Process";return _scheduleNextWithGuard(std::move(guard),ServiceExecutor::kDeferredTask |ServiceExecutor::kMayYieldBeforeSchedule,transport::ServiceExecutorTaskName::kSSMExhaustMessage);} else {_state.store(State::Source);LOG(1) << "conca _sinkCallback store State::Source";return _scheduleNextWithGuard(std::move(guard),ServiceExecutor::kDeferredTask |ServiceExecutor::kMayYieldBeforeSchedule,transport::ServiceExecutorTaskName::kSSMSourceMessage);}
}

session发完消息之后, State::SinkWait 》 State::Source 到此位置这个find命令已经完成,_scheduleNextWithGuard后面继续等待新的命令到来。

上面find命令打印日志如下: 

2025-04-24T21:24:02.580+0800 D1 NETWORK  [conn1] conca _sourceMessage State::Source
2025-04-24T21:24:02.581+0800 D1 NETWORK  [conn1] conca _sourceMessage store State::SourceWait
2025-04-24T21:24:09.957+0800 D1 NETWORK  [conn1] conca _sinkMessage State::SinkWait
2025-04-24T21:24:09.957+0800 D1 NETWORK  [conn1] conca _sinkMessage store State::Process
2025-04-24T21:24:09.958+0800 D1 NETWORK  [conn1] conca func end
2025-04-24T21:24:09.961+0800 D1 NETWORK  [conn1] conca guard.release() end
2025-04-24T21:24:09.962+0800 D1 NETWORK  [conn1] conca _runNextInGuard  State::Process
2025-04-24T21:24:09.963+0800 D1 NETWORK  [conn1] conca _processMessage
2025-04-24T21:24:09.964+0800 D1 NETWORK  [conn1] conca _processMessage _sep->handleRequest
2025-04-24T21:24:09.965+0800 I  SHARDING [conn1] Marking collection db.user as collection version: <unsharded>
2025-04-24T21:24:09.969+0800 I  COMMAND  [conn1] command db.user appName: "MongoDB Shell" command: find { find: "user", filter: {}, lsid: { id: UUID("7ae50c73-fcc6-4d15-8a80-7f2bf2192e0f") }, $db: "db" } planSummary: COLLSCAN keysExamined:0 docsExamined:6 cursorExhausted:1 numYields:0 nreturned:6 reslen:421 locks:{ ReplicationStateTransition: { acquireCount: { w: 1 } }, Global: { acquireCount: { r: 1 } }, Database: { acquireCount: { r: 1 } }, Collection: { acquireCount: { r: 1 } }, Mutex: { acquireCount: { r: 1 } } } storage:{ data: { bytesRead: 416 } } protocol:op_msg 3ms
2025-04-24T21:24:09.975+0800 D1 NETWORK  [conn1] conca _processMessage _sep->handleRequest dbresponse
2025-04-24T21:24:09.978+0800 D1 NETWORK  [conn1] conca _processMessage !toSink.empty()
2025-04-24T21:24:09.978+0800 D1 NETWORK  [conn1] conca _processMessage makeExhaustMessage
2025-04-24T21:24:09.979+0800 D1 NETWORK  [conn1] conca _processMessage TrafficRecorder::get(_serviceContext) .observe
2025-04-24T21:24:09.980+0800 D1 NETWORK  [conn1] conca _processMessage _sinkMessage
2025-04-24T21:24:09.981+0800 D1 NETWORK  [conn1] conca _sinkMessage State::Source
2025-04-24T21:24:09.986+0800 D1 NETWORK  [conn1] conca _sinkMessage store State::SinkWait
2025-04-24T21:24:09.986+0800 D1 NETWORK  [conn1] conca _sinkCallback State::SinkWait
2025-04-24T21:24:09.987+0800 D1 NETWORK  [conn1] conca _sinkCallback store State::Source
2025-04-24T21:24:09.988+0800 D1 NETWORK  [conn1] conca func end
2025-04-24T21:24:09.989+0800 D1 NETWORK  [conn1] conca guard.release() end
2025-04-24T21:24:09.990+0800 D1 NETWORK  [conn1] conca_serviceExecutor->schedule(std::move(func), flags, taskName)
2025-04-24T21:24:09.991+0800 D1 NETWORK  [conn1] conca_serviceExecutor->schedule(std::move(func), flags, taskName)
2025-04-24T21:24:09.992+0800 D1 NETWORK  [conn1] conca _runNextInGuard  State::Source
2025-04-24T21:24:09.993+0800 D1 NETWORK  [conn1] conca _sourceMessage State::Source
2025-04-24T21:24:09.994+0800 D1 NETWORK  [conn1] conca _sourceMessage store State::SourceWait

总结:

mongo第一条命令流程是:State::Created 》 State::Source 》State::SourceWait 》 State::Process 》 State::SinkWait  》 State::Source 》State::SourceWait

mongo第二条命令流程是:                                                                                       State::Process 》 State::SinkWait  》 State::Source 》State::SourceWait

mongo第三条命令流程是:                                                                                       State::Process 》 State::SinkWait  》 State::Source 》State::SourceWait


http://www.hkcw.cn/article/ZSJrCNKTBf.shtml

相关文章

酒店管理破局:AI 引领智能化转型

一、酒店行业现状&#xff1a;规模扩张加速与效率瓶颈并存 根据中国五矿证券《中国酒店行业格局分析》报告&#xff0c;国内酒店行业呈现头部集中化趋势。截至2024年第三季度&#xff0c;锦江酒店以13,186家门店、125.8万间客房的规模稳居行业第一[1]。华住集团则以178.68亿元…

大模型深度学习之双塔模型

前言 双塔模型&#xff08;Two-Tower Model&#xff09;是一种在推荐系统、信息检索和自然语言处理等领域广泛应用的深度学习架构。其核心思想是通过两个独立的神经网络&#xff08;用户塔和物品塔&#xff09;分别处理用户和物品的特征&#xff0c;并在共享的语义空间中通过相…

【Java Web】速通CSS

参考笔记:JavaWeb 速通CSS_java css-CSDN博客 目录 一、CSS入门 1. 基本介绍 2. 作用 二、CSS的3种引入方式 1. 行内式 1.1 示例代码 1.2 存在问题 2. 写在head标签的style子标签中 2.1 示例代码 2.2 存在问题 3.以外部文件的形式引入(开发中推荐使用)⭐⭐⭐ 3.1 说明 3…

PostgreSQL安装

我们使用开源的对象关系型数据库--PostgreSQL&#xff0c;它具有高性能、可扩展和支持复杂查询的特性&#xff0c;非常适合现在学习使用。 一.安装PostgreSQL 我用的windows&#xff0c;就在windows上安装。 1.首先访问 PostgreSQL 官方网站https://www.postgresql.org/dow…

C++:栈帧、命名空间、引用

一、前置知识 1.1、栈区&#xff08;Stack&#xff09; 1.1.1、内存分配与回收机制 分配方式​​&#xff1a;由编译器自动管理&#xff0c;通过调整栈指针&#xff08;ESP/RSP&#xff09;实现。 函数调用时&#xff0c;栈指针下移&#xff08;栈从高地址向低地址增长&…

【HarmonyOS 5】鸿蒙应用px,vp,fp概念详解

【HarmonyOS 5】鸿蒙应用px&#xff0c;vp&#xff0c;fp概念详解 一、前言 目前的鸿蒙开发者&#xff0c;大多数是从前端或者传统移动端开发方向&#xff0c;转到鸿蒙应用开发方向。 前端开发同学对于开发范式很熟悉&#xff0c;但是对于工作流程和开发方式是会有不适感&am…

[Rust_1] 环境配置 | vs golang | 程序运行 | 包管理

目录 Rust 环境安装 GoLang和Rust 关于Go 关于Rust Rust vs. Go&#xff0c;优缺点 GoLang的优点 GoLang的缺点 Rust的优点 Rust的缺点 数据告诉我们什么&#xff1f; Rust和Go的主要区别 (1) 性能 (2) 并发性 (3) 内存安全性 (4) 开发速度 (5) 开发者体验 Ru…

Codeforces Round 1024 (Div. 2)

Problem - A - Codeforces 思维题&#xff1a; 如果n不能整除p&#xff0c;就会多出一部分&#xff0c;这个部分可以作为调和者&#xff0c;使整个数组符合要求。 如果n能整除p&#xff0c;没有调和空间&#xff0c;只有看n/p*qm 来看代码&#xff1a; #include <bits/s…

【东枫科技】KrakenSDR 天线阵列设置

标准测向需要五根相同的全向天线。您可以折衷使用更少的天线&#xff0c;但为了获得最佳性能&#xff0c;我们建议使用全部五根天线。这些天线通常是磁铁安装的鞭状天线&#xff0c;或偶极子天线。我们建议始终使用均匀圆形阵列 (UCA) 天线&#xff0c;因为它可以确定来自各个方…

包含Javascript的HTML静态页面调取本机摄像头

在实际业务开发中&#xff0c;需要在带有摄像头的工作机上拍摄施工现场工作过程的图片&#xff0c;然后上传到服务器备存。 这便需要编写可以运行在浏览器上的代码&#xff0c;并在代码中实现Javascript调取摄像头、截取帧保存为图片的功能。 为了使用户更快掌握JS调取摄像头…

2023年6月第三套第二篇

找和脑子有关系的rather than 不是的意思&#xff0c;不用看 instead表示递进的解释 even when即使不重要&#xff0c;看前方主句 d选项是even when和前方主句的杂糅&#xff0c;往往是错的 instead of 而不是 这道题&#xff0c;有的人觉得避免模仿这时候你会笑&#xff0c;所…

Redis的大Key问题如何解决?

大家好&#xff0c;我是锋哥。今天分享关于【Redis的大Key问题如何解决&#xff1f;】面试题。希望对大家有帮助&#xff1b; Redis的大Key问题如何解决&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Redis中的“大Key”问题是指某个键的值占用了过多…

magic-api配置Git插件教程

一、配置gitee.com 1&#xff0c;生成rsa密钥&#xff0c;在你的电脑右键使用管理员身份运行&#xff08;命令提示符&#xff09;&#xff0c;执行下面命令 ssh-keygen -t rsa -b 2048 -m PEM一直按回车键&#xff0c;不需要输入内容 找到 你电脑中的~/.ssh/id_rsa.pub 文件…

Virtuoso中对GDS文件进行工艺库转换的方法

如果要对相同工艺节点下进行性能评估&#xff0c;可以尝试将一个厂商的GDS文件转换到另一个厂商&#xff0c;不过要注意的是不同厂商&#xff08;比如SMIC和TSMC&#xff09;之间的DRC规则&#xff0c;尽量采用两个DRC中的约束较为紧张的厂商进行设计&#xff0c;以免转换到另外…

【二】9.关于设备树简介

1.什么是设备树&#xff1a; &#xff08;DTS&#xff09;采用树形结构描述扳级设备&#xff0c;也就是开发板上的设备信息&#xff0c;每个设备都是一个节点。 一个SOC可以做出很多不同的板子&#xff0c;这些不同的板子肯定是有共同的信息&#xff0c;将这些共同的信息提取出…

VSCode远程开发-本地SSH隧道保存即时修改

工作环境是一个网站团队几人同时在改&#xff0c;为了减少冲突&#xff0c;我们选择在自己公司服务器上先部署一版线上通用&#xff0c;再连接到不同的本地&#xff0c;这样我们团队可以在线上即时看到他人修改的结果&#xff0c;不用频繁拉取提交推送代码 在线上服务器建一个…

Embedded IDE下载及调试

安装cortex_debug插件 我这边用jlink烧录&#xff0c;其他的根据你自己的来 jlink路径在左下角齿轮设置里 设置位置&#xff1a; 芯片名称配置的都是自动生成的&#xff0c;在eide.json的这里改为你jflash芯片包的设置 调试里也会自动生成一个cortex_debug的调试选项 点旁边的…

lua注意事项

感觉是lua的一大坑啊&#xff0c;它还不如函数内部就局部变量呢 注意函数等内部&#xff0c;全部给加上local得了

【第4章 图像与视频】4.4 离屏 canvas

文章目录 前言为什么要使用 offscreenCanvas为什么要使用 OffscreenCanvas如何使用 OffscreenCanvas第一种使用方式第二种使用方式 计算时长超过多长时间适合用Web Worker 前言 在 Canvas 开发中&#xff0c;我们经常需要处理复杂的图形和动画&#xff0c;这些操作可能会影响页…

长安链起链调用合约时docker ps没有容器的原因

在调用这个命令的时候&#xff0c;发现并没有出现官方预期的合约容器&#xff0c;这是因为我们在起链的时候没有选择用docker的虚拟环境&#xff0c;实际上这不影响后续的调用&#xff0c;如果想要达到官方的效果那么你只需要在起链的时候输入yes即可&#xff0c;如图三所示