【网络入侵检测】基于Suricata源码分析FlowManager实现

article/2025/8/15 23:03:52

【作者主页】只道当时是寻常

【专栏介绍】Suricata入侵检测。专注网络、主机安全,欢迎关注与评论。

1. 概要

👋 本文聚焦开源网络入侵检测系统 Suricata 的核心模块 FlowManager,深入解读其源码,解析流量管理实现机制。FlowManager 承担流的超时检查、资源回收、紧急模式处理等关键功能。文章从内存压力值计算、动态调整休眠间隔、流状态切换逻辑入手,结合代码实例,说明其如何动态调节扫描行数与休眠时间,平衡负载与检测效率。

2. 源码解析

2.1 模块注册(Register)

在 Suricata 初始化阶段会将 FlowManager 模块注册到全局模块列表中 tmm_modules,后续由专门的线程负责执行该模块的主函数。

Flow Manager模块详细信息如下代码所示:

void TmModuleFlowManagerRegister (void)
{tmm_modules[TMM_FLOWMANAGER].name = "FlowManager";tmm_modules[TMM_FLOWMANAGER].ThreadInit = FlowManagerThreadInit;tmm_modules[TMM_FLOWMANAGER].ThreadDeinit = FlowManagerThreadDeinit;tmm_modules[TMM_FLOWMANAGER].Management = FlowManager;tmm_modules[TMM_FLOWMANAGER].cap_flags = 0;tmm_modules[TMM_FLOWMANAGER].flags = TM_FLAG_MANAGEMENT_TM;SCLogDebug("%s registered", tmm_modules[TMM_FLOWMANAGER].name);SC_ATOMIC_INIT(flowmgr_cnt);SC_ATOMIC_INITPTR(flow_timeouts);
}

2.2 初始化(Init)

在 Suricata 中 FlowManagerThreadInit 函数实现 FlowManager 线程的初始化操作。下面我将基于 FlowManagerThreadInit 函数源码分段介绍初始化具体流程。

(1)创建 FlowManager 线程数据对象,用于保存当前线程的初始化数据。

FlowManagerThreadData *ftd = SCCalloc(1, sizeof(FlowManagerThreadData));
if (ftd == NULL)return TM_ECODE_FAILED;

(2)配置当前线程实例序号值。每个新增一个 FlowManager 线程示例序号值加一。

ftd->instance = SC_ATOMIC_ADD(flowmgr_cnt, 1);
SCLogDebug("flow manager instance %u", ftd->instance);

(3)在 Suricata 的配置文件中,可通过 flow.managers 来设置创建 FlowManager 线程的数量,flow.hash-size 设置了哈希表的大小。因此,这里需要计算每个 FlowManager 线程负责维护哈希表的范围。如果是最后一个线程示例则其右侧区间为哈希表的结尾。

 /* set the min and max value used for hash row walking* each thread has it's own section of the flow hash */
uint32_t range = flow_config.hash_size / flowmgr_number;ftd->min = ftd->instance * range;
ftd->max = (ftd->instance + 1) * range;/* last flow-manager takes on hash_size % flowmgr_number extra rows */
if ((ftd->instance + 1) == flowmgr_number) {ftd->max = flow_config.hash_size;
}
BUG_ON(ftd->min > flow_config.hash_size || ftd->max > flow_config.hash_size);SCLogDebug("instance %u hash range %u %u", ftd->instance, ftd->min, ftd->max);

(4)初始化 FlowManager 模块的计数器。

FlowCountersInit(t, &ftd->cnt);

(5)初始化报文池,即每个处理线程支持同时处理的最大报文数量(max_pending_packets)。很奇怪,为什么这个函数在这里调用,该函数貌似和 FlowManager 没有什么联系。

PacketPoolInit();

2.3 流管理(FlowManager)

Suricata 的 FlowManager 函数是流管理模块的核心函数,主要用于检查流超时并回收资源。

FlowManager 函数中最核心的是其资源处理算法的实现,如果要理解这个算法需要先了解以下几个实现。

2.3.1 内存压力值

MemcapsGetPressure 函数用户获取当前内存压力值,该函数主要关注最容易成为内存压力瓶颈的四个引擎,分别是streamstream-reassemblyflow applayer-proto-http,并取其压力最大值。

如何计算每个引擎的压力值呢?

下面代码是 MemcapsGetPressure 函数的实现:

float MemcapsGetPressure(void)
{float percent = 0.0;for (int i = 0; i < 4; i++) { // only flow, streams, httpuint64_t memcap = memcaps[i].GetFunc();if (memcap) {uint64_t memuse = memcaps[i].GetMemuseFunc();float p = (float)((double)memuse / (double)memcap);// SCLogNotice("%s: memuse %"PRIu64", memcap %"PRIu64" => %f%%",//    memcaps[i].name, memuse, memcap, (p * 100));percent = MAX(p, percent);}}return percent;
}

可以看到,内存压力值算法是当前已用内存量除以申请的内存量。压力值为 0.0 表示未使用已申请的内存,1.0 表示内存使用达到上限。

2.3.2 动态调整休眠间隔

GetWorkUnitSizing 函数会根据系统内存压力值,即 MemcapsGetPressure 函数计算结果动态计算每次处理的哈希表行数和休眠间隔。下面通过拆解 GetWorkUnitSizing 函数实现来理解其算法原理:

(1)如果当前处于紧急状态(emergency),则需要流管理线程全速处理。要求线程实例处理哈希表所有行(即该线程实例负责的哈希表所有行),线程休眠间隔固定为250ms。

if (emergency) {*wu_rows = rows;*wu_sleep = 250;return;
}

(2)将内存压力值从百分比转成0~100的整数。要求压力值最小为10。

/* minimum busy score is 10 */
const uint32_t emp = MAX(mp, 10);

(3)计算每秒处理的哈希表行数,取决于内存压力值百分比。

const uint32_t rows_per_sec = (uint32_t)((float)rows * (float)((float)emp / (float)100));

(4)假设每行需要消耗的时间为1微秒,那么每秒处理的哈希行数需要消耗多长时间(最大为1s)。

 /* calc how much time we estimate the work will take, in ms. We assume* each row takes an average of 1usec. Maxing out at 1sec. */
const uint32_t work_per_unit = MIN(rows_per_sec / 1000, 1000);

(5)计算处理完指定的哈希行数后,剩余的时间即为休眠间隔时间,最小为250ms。

/* calc how much time we need to sleep to get to the per second cadence* but sleeping for at least 250ms. */
const uint32_t sleep_per_unit = MAX(250, 1000 - work_per_unit);
SCLogDebug("mp %u emp %u rows %u rows_sec %u sleep %ums", mp, emp, rows, rows_per_sec,sleep_per_unit);

2.3.3 源码解析

在了解2.3.1 和 2.3.2 两个算法实现的基础上,我们再来通过拆解 FlowManager 函数实现来了解流管理模块的实现流程。

(1)获取当前内存压力值。

uint32_t mp = MemcapsGetPressure() * 100;

(2)获取当前线程休眠时间。

GetWorkUnitSizing(rows, mp, false, &sleep_per_wu, &rows_per_wu, &rows_sec);

(3)判断当前线程是否被暂停,如果被暂停则会等待,直到取消暂停。

if (TmThreadsCheckFlag(th_v, THV_PAUSE)) {TmThreadsSetFlag(th_v, THV_PAUSED);TmThreadTestThreadUnPaused(th_v);TmThreadsUnsetFlag(th_v, THV_PAUSED);
}

(4)检查紧急模式状态。

bool emerg = ((SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) != 0);

(5)获取当前时间戳。

 /* Get the time */
ts = TimeGet();
SCLogDebug("ts %" PRIdMAX "", (intmax_t)SCTIME_SECS(ts));
uint64_t ts_ms = SCTIME_MSECS(ts);

(6)设置流进入紧急模式。其中 prev_emerg 表示上一个流是否处于紧急模式,若否,则说明当前流进入新的紧急模式阶段。

const bool emerge_p = (emerg && !prev_emerg);
if (emerge_p) {next_run_ms = 0;prev_emerg = true;SCLogNotice("Flow emergency mode entered...");StatsIncr(th_v, ftd->cnt.flow_emerg_mode_enter);
}

(7)执行流超时检查,next_run_ms = ts_ms + sleep_per_wu; 即期望下一次执行流检查时的时间戳。

if (ts_ms >= next_run_ms) {... ...
}

(8)若处于紧急模式,需处理该线程实例负责的哈希表中所有行的超时;否则仅处理部分行。FlowTimeoutHash 和 FlowTimeoutHashInChunks 函数用于检查哈希表中指定行数的超时流,若发现超时流,则将其从哈希表移除并添加到 d->aside_queue 队列。

if (emerg) {/* in emergency mode, do a full pass of the hash table */FlowTimeoutHash(&ftd->timeout, ts, ftd->min, ftd->max, &counters);StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);
} else {SCLogDebug("hash %u:%u slice starting at %u with %u rows", ftd->min, ftd->max, pos,rows_per_wu);const uint32_t ppos = pos;FlowTimeoutHashInChunks(&ftd->timeout, ts, ftd->min, ftd->max, &counters, rows_per_wu, &pos);if (ppos > pos) {StatsIncr(th_v, ftd->cnt.flow_mgr_full_pass);}
}

(9)计算当前空闲流的百分比,当空闲流百分比超过恢复阈值,则紧急模式计数器加一直到连续30次超过恢复阈值,退出紧急模式。

const uint32_t spare_pool_len = FlowSpareGetPoolSize();
StatsSetUI64(th_v, ftd->cnt.flow_mgr_spare, (uint64_t)spare_pool_len);FlowCountersUpdate(th_v, ftd, &counters);if (emerg == true) {SCLogDebug("flow_sparse_q.len = %" PRIu32 " prealloc: %" PRIu32"flow_spare_q status: %" PRIu32 "%% flows at the queue",spare_pool_len, flow_config.prealloc,spare_pool_len * 100 / MAX(flow_config.prealloc, 1));/* only if we have pruned this "emergency_recovery" percentage* of flows, we will unset the emergency bit */if ((spare_pool_len * 100 / MAX(flow_config.prealloc, 1)) >flow_config.emergency_recovery) {emerg_over_cnt++;} else {emerg_over_cnt = 0;}if (emerg_over_cnt >= 30) {SC_ATOMIC_AND(flow_flags, ~FLOW_EMERGENCY);FlowTimeoutsReset();emerg = false;prev_emerg = false;emerg_over_cnt = 0;SCLogNotice("Flow emergency mode over, back to normal... unsetting"" FLOW_EMERGENCY bit (ts.tv_sec: %" PRIuMAX ", ""ts.tv_usec:%" PRIuMAX ") flow_spare_q status(): %" PRIu32"%% flows at the queue",(uintmax_t)SCTIME_SECS(ts), (uintmax_t)SCTIME_USECS(ts),spare_pool_len * 100 / MAX(flow_config.prealloc, 1));StatsIncr(th_v, ftd->cnt.flow_emerg_mode_over);}
}/* update work units */
const uint32_t pmp = mp;
mp = MemcapsGetPressure() * 100;
if (ftd->instance == 0) {StatsSetUI64(th_v, ftd->cnt.memcap_pressure, mp);StatsSetUI64(th_v, ftd->cnt.memcap_pressure_max, mp);
}
GetWorkUnitSizing(rows, mp, emerg, &sleep_per_wu, &rows_per_wu, &rows_sec);
if (pmp != mp) {StatsSetUI64(th_v, ftd->cnt.flow_mgr_rows_sec, rows_sec);
}next_run_ms = ts_ms + sleep_per_wu;

(10)定期执行其它模块超时检查的逻辑,只有第一个流量管理实例(instance 0)执行这些操作,避免多线程重复处理。

if (other_last_sec == 0 || other_last_sec < (uint32_t)SCTIME_SECS(ts)) {if (ftd->instance == 0) {DefragTimeoutHash(ts);HostTimeoutHash(ts);IPPairTimeoutHash(ts);HttpRangeContainersTimeoutHash(ts);other_last_sec = (uint32_t)SCTIME_SECS(ts);}
}

(11)如果处于紧急模式,现成休眠250ms,如果未处于紧急模式,计算休眠时间,通过带超时的条件变量对线程进行休眠操作。

if (emerg || !time_is_live) {usleep(250);
} else {struct timeval cond_tv;gettimeofday(&cond_tv, NULL);struct timeval add_tv;add_tv.tv_sec = 0;add_tv.tv_usec = (sleep_per_wu * 1000);timeradd(&cond_tv, &add_tv, &cond_tv);struct timespec cond_time = FROM_TIMEVAL(cond_tv);SCCtrlMutexLock(&flow_manager_ctrl_mutex);while (1) {int rc = SCCtrlCondTimedwait(&flow_manager_ctrl_cond, &flow_manager_ctrl_mutex, &cond_time);if (rc == ETIMEDOUT || rc < 0)break;if (SC_ATOMIC_GET(flow_flags) & FLOW_EMERGENCY) {break;}}SCCtrlMutexUnlock(&flow_manager_ctrl_mutex);
}


http://www.hkcw.cn/article/alCtdxcCNf.shtml

相关文章

永辉超市及其法人被限消 涉服务合同纠纷

企查查APP显示,永辉超市及其法定代表人张轩松近日被限制高消费。原因是未按执行通知书指定的期间履行生效法律文书确定的给付义务。申请人是郑州市唐科废旧物资回收有限公司,涉及服务合同纠纷案件。此前,永辉超市已因此案被执行超过39万元。责任编辑:zhangxiaohua

一条龙舟值套房?揭秘“水上超跑” 龙舟漂移绝技

一条龙舟值套房?揭秘“水上超跑” 龙舟漂移绝技!端午节即将到来,全国各地的龙舟赛陆续开赛。广东佛山叠滘的龙舟因其独特的技术和风格被网友们称为“银河系唯一水上F1”。叠滘龙舟讲究人舟合一,每条龙舟需40多人默契配合,舵手凭经验预判水流和角度,队员瞬间调整重心,这种…

声援魏建军,何小鹏呼吁卷科技而非低价 未来选车看算力

小鹏汽车CEO何小鹏在最近的一次媒体访问中,针对魏建军近期发言和比亚迪降价相关问题发表了自己的看法。他认为,汽车行业不应只关注价格竞争,而应注重科技发展,特别是向具身智能方向迈进。不久前,长城汽车董事长魏建军指出,当前中国汽车产业面临资本裹挟、恶性竞争和技术趋…

为什么美国政府要对高校穷追猛打 连番施压背后

为什么美国政府要对高校穷追猛打 连番施压背后!最近,美国政府对哈佛大学采取了一系列严厉措施,包括禁止招收国际学生、冻结政府拨款以及威胁取消其免税地位。哈佛大学对此毫不退让,选择了对抗。苏晓晖认为,美国总统接连对哈佛大学施压,表面上打着打击校园“反犹主义”和去…

特朗普留学生禁令被“叫停”,这场大戏远没有结束 京酿馆 哈佛反击战持续升级

近段时间,美国特朗普政府对哈佛大学进行了一系列打压,引发了全球关注。据财联社报道,当地时间5月29日,美国马萨诸塞州联邦地区法院一名法官发布了一项命令,叫停了特朗普政府禁止哈佛大学招收外国学生的政策。听证会后,法院网站信息显示,此前发布的临时限制令将继续有效,…

欧冠官方晒近11年决赛比分 为决赛预热

北京时间6月1日,巴黎圣日耳曼将在慕尼黑与国际米兰争夺本赛季欧冠冠军。为了预热本次决赛,欧冠联赛官方回顾了近11年的欧冠决赛赛果。2024年伦敦,皇马以2-0战胜多特;2023年伊斯坦布尔,曼城以1-0击败国米;2022年巴黎,皇马以1-0战胜利物浦;2021年波尔图,切尔西以1-0击败…

Linux发行版本的安装

目录 一、彻底删除VMware 二、VMware-17虚拟机安装 三、MobaXterm 安装 四、Centos 发行版 7.9的安装 五、rockys 9.1的安装 六、ubuntu2204的安装 一、彻底删除VMware 在卸载VMware虚拟机之前&#xff0c;要先把与VMware相关的服务和进程终止 1. 在windows中按下【Windo…

云计算、大数据、人工智能、物联网、虚拟现实技术、区块链技术

先自我介绍一下&#xff0c;小编浙江大学毕业&#xff0c;去过华为、字节跳动等大厂&#xff0c;目前阿里P7 深知大多数程序员&#xff0c;想要提升技能&#xff0c;往往是自己摸索成长&#xff0c;但自己不成体系的自学效果低效又漫长&#xff0c;而且极易碰到天花板技术停滞…

云计算与大数据进阶 | 26、解锁云架构核心:深度解析可扩展数据库的5大策略与挑战(上)

在云应用/服务的 5 层架构里&#xff0c;数据库服务层稳坐第 4 把交椅&#xff0c;堪称其中的 “硬核担当”。它的复杂程度常常让人望而生畏&#xff0c;不少人都将它视为整个架构中的 “终极挑战”。 不过&#xff0c;也有人觉得可扩展存储系统才是最难啃的 “硬骨头”&#…

云原生--基础篇-3--云原生概述(云、原生、云计算、核心组成、核心特点)

1、什么是云和原生 &#xff08;1&#xff09;、什么是云&#xff1f; “云”指的是云计算环境&#xff0c;代表应用运行的基础设施和资源。依赖并充分利用云计算的弹性、分布式和资源池化能力。 核心含义&#xff1a; 1、云计算基础设施 云原生应用的设计和运行完全基于云…

云计算概念技术与架构:全面掌握云计算核心功能与场景

云计算概念技术与架构&#xff1a;全面掌握云计算核心功能与场景 【下载地址】云计算概念技术与架构 《云计算&#xff1a;概念、技术与架构》是一本全面深入的云计算指南&#xff0c;由Thomas Erl、Zaigham Mahmood和Ricardo Puttini共同撰写。本书详细解析了云计算的基础概念…

肝了半年,我整理出了这篇云计算学习路线(新手必备,从入门到精通)

大家好&#xff01;我是凯哥&#xff0c;今天给大家分享一下云计算学习路线图。这是我按照自己最开始学习云计算的时候的学习路线&#xff0c;并且结合自己从业多年所涉及的知识精心总结的云计算的思维导图。这是凯哥精心总结的&#xff0c;花费了不少精力哦&#xff0c;希望对…

探索虚拟化:云计算时代的资源优化之道

前言 如果您想知道云提供商如何在全球范围内运行无数应用程序&#xff0c;而每个应用程序都没有机架服务器&#xff0c;那么答案就在于虚拟化。 它是为云提供支持的核心技术之一&#xff0c;在幕后悄悄工作&#xff0c;使现代计算高效、可扩展且具有成本效益。 在本文中&#x…

Linux云计算运维笔记:掌握云计算与Linux运维核心技能

Linux云计算运维笔记&#xff1a;掌握云计算与Linux运维核心技能 【下载地址】Linux云计算运维笔记 《Linux云计算运维笔记》是一份全面且实用的资源文件&#xff0c;专为Linux运维与云计算领域的从业者打造。它涵盖了从自动化运维、容器部署到服务器管理、数据库优化等关键技能…

【机密计算顶会解读】07:eOPF——用于机密云计算的可扩展编排和保护框架

导读&#xff1a;本文介绍eOPF框架&#xff0c;通过监控enclave与OS交互&#xff0c;从而提供细粒度控制、共同认证和侧信道防御。 原文链接&#xff1a;An Extensible Orchestration and Protection Framework for Confidential Cloud Computing | USENIX An Extensible Orc…

SaaS基于云计算、大数据的Java云HIS平台信息化系统源码

利用云计算、大数据等现代信息技术研发的医疗信息管理系统&#xff08;HIS&#xff09;实现了医院信息化从局域网向互联网转型&#xff0c;重新定义医疗卫生信息化建设的理念、构架、功能和运维体系。平台构建了以患者为中心的云架构、云服务、云运维的信息体系&#xff0c;实现…

人工智能:所有144本SCI期刊都在这里(20本Top,4本On Hold)

本周投稿推荐 SCI&EI • 4区“水刊”&#xff0c;纯正刊&#xff08;来稿即录&#xff09; • CCF-B类&#xff0c;IEEE一区-Top&#xff08;3天初审&#xff09; EI • 各领域沾边均可&#xff08;2天录用&#xff09; 知网&#xff08;CNKI&#xff09;、谷歌学术 …

全网最最最详细的haproxy详解!!!

1 什么是负载均衡 负载均衡&#xff08;Load Balancing&#xff09;是一种将网络请求或工作负载分散到多个服务器或计算机资源上的技术&#xff0c;以实现优化资源使用、提高系统吞吐量、增强数据冗余和故障容错能力、以及减少响应时间的目的。在分布式系统、云计算环境、Web服…

卤鹅哥揭秘去美国给甲亢哥带的礼物 为中外文化交流的鲜活注脚

2025年5月,一则消息引爆全网:重庆荣昌卤鹅哥林江即将启程赴美,与顶流网红“甲亢哥”(IShowSpeed)再续跨国情谊。这位因“追星”走红的非遗推广大使,不仅带上了家乡的招牌卤鹅,更精心筹备了一份融合非遗文化与美食的“中国礼包”,更透露最想与甲亢哥即兴来一段说唱。这场…

龙舟赛辽宁队把对手“干翻”了 网友:今年有进步了,只干翻了一条

2025南充嘉陵江龙舟赛,辽宁队再“撞”佳绩。网友:辽宁队今年已经有进步了,去年干翻三条船,今年只干翻了一条。责任编辑:zx0002