进程间通信(消息队列)

article/2025/6/18 16:14:42

目录

一 原理

二 API

1. ftok

2. msgget

3. msgctl

4. msgsnd

5. msgrcv

三 demo代码

四 基于责任链模式和消息队列对数据处理

1. 什么是责任链模式

2. 下面基于责任链模式来对消息队列获取的消息进行处理


前置

其实system v 版本的进程间通信,设计的接口都类似,所以会用一个也就会用其他的了,比如前面的共享内存的shmget获取shmid,本章讲的消息队列提供的msgget获取msgid是一样的,接口都大同小异。

不过消息队列不同于共享内存的是,发送的数据是有类型的,不像共享内存没有类型的数据发送。

下面的消息结构

struct msgbuf {long mtype;     // 谁发送的char mtext[N];  // 发送的消息};

一 原理

1. 和共享内存一样通过系统调用并让物理内存和共享区进行映射。

2. 但不一样的是共享内存通信的时候不需要系统调用,但消息队列需要,所以消息队列读写操作,默认没数据读会阻塞,缓冲区满了写会阻塞,自带同步与互斥。

3. 先通过系统调用建立共享区与物理内存的映射,后续在通过系统调用进行发送和接收消息,发送消息的时候要携带消息id,代表是谁发的,后续收消息要根据这个id区分拿谁发的消息,毕竟总不能自己发自己收吧。

二 API

1. ftok
#include <sys/types.h>
#include <sys/ipc.h>key_t ftok(const char *pathname, int proj_id);

和共享内存一样这里就不详细介绍了,形成唯一的 key_t 类型标识唯一性。

2. msgget
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int msgget(key_t key,   // ftok的返回值int msgflg   // 和共享内存一样 IPC_CREAT/IPC_EXCL// IPC_CREAT            存在则返回已经存在的,否则创建新的// IPC_CREAT | IPC_EXCL 不纯在则创建新的,纯在则出错返回);// 返回值为0正常,-1错误
3. msgctl
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int msgctl(int msqid,             // msgget的返回值int cmd,               // 对已经存在的消息队列 CURD 操作struct msqid_ds *buf   // 消息队列的属性字段);// 返回值为0正常,失败-1
4. msgsnd
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>int msgsnd(int msqid,          // msgget的返回值const void *msgp,   // 消息的数据类型结构size_t msgsz,       // 大小(不携带该结构第一个字段 (long mtype))int msgflg          // 怎么发,阻塞/非阻塞等);// 返回值0正常,-1错误
5. msgrcv
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>ssize_t msgrcv(int msqid,     // msgget的返回值 void *msgp,    // 消息结构类型size_t msgsz,  // 大小(不携带类型第一个字段)long msgtyp,   // 该类型第一个字段,表示收谁的消息int msgflg     // 怎么收,阻塞/非阻塞等);// 返回值>0实际读到的数据个数,为-1读出错

三 demo代码

#pragma once#include <iostream>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <unistd.h>#include <cstring>// 初始化数据
const int mydefault = -1;// 标识谁发的消息
const int Ser = 1;
const int Cli = 2;// 协同 ftok 形成的 key_t 类型
const static std::string path = "/home/CD/linux/message_queue/Demo_mes";
const static int proj_id = 1234;// 缓冲区大小
const size_t mes_buff = 4096;// 消息类型
struct mymessage
{long mtype;char mtext[mes_buff];
};// 转16进制
void To_Hex(int val)
{char buff[1024] = {0};snprintf(buff, sizeof(buff), "0x%x", val);std::cout << buff << std::endl;return;
}// 获取 key_t
key_t Getkey(const std::string &mypath, int myproj)
{key_t key = ftok(mypath.c_str(), myproj);return key;
}// 公共方法
class Message_Queue
{
public:Message_Queue(const std::string &mypath, int myproj) : _msqid(mydefault), _path(mypath), _proj_id(myproj), _key(mydefault){_key = Getkey(_path, _proj_id);if (_key == -1){std::cout << "Get key failed" << std::endl;exit(-1);}To_Hex(_key);}// 创建者创建消息队列void Create(){// 创建并设置权限为 读写_msqid = msgget(_key, IPC_CREAT | IPC_EXCL | 0666);if (_msqid == -1){std::cout << "Create _msqid failed" << std::endl;exit(-2);}}// 获取已经存在的消息独立额void User(){_msqid = msgget(_key, IPC_CREAT);if (_msqid == -1){std::cout << "User get failed" << std::endl;exit(-2);}}// 释放消息队列void destroy(){if (_msqid != -1){int n = msgctl(_msqid, IPC_RMID, nullptr);if (n == -1){std::cout << "remove messque falied" << std::endl;exit(3);}}}// 发送消息void Sendmessage(){// 定义消息对象并初始化mymessage data;memset(&data, 0, sizeof(data));// 标明谁发的data.mtype = Ser;while (true){std::string s;std::getline(std::cin, s);memcpy(&data.mtext, s.c_str(), sizeof(s));//  发送消息                   不能携带第一个字段 缓冲区满了就阻塞int n = msgsnd(_msqid, &data, sizeof(data.mtext), 0);if (n == -1){std::cout << "msgsnd failed" << std::endl;}if (data.mtext[0] == 'q')break;}}// 接收消息void Recvmessage(){// 定义消息对象mymessage data;while (true){//   接收消息                 不能携带第一个字段 收谁的消息 缓冲区为空就阻塞int n = msgrcv(_msqid, &data, sizeof(data.mtext), Ser, 0);data.mtext[n] = 0;if (n == -1){std::cout << "msgrcv failed" << std::endl;}if (data.mtext[0] == 'q')break;std::cout << data.mtext << std::endl;}}~Message_Queue(){}private:// 标识消息队列idint _msqid;// 形成的 key_tstd::string _path;int _proj_id;key_t _key;
};// 创建者/收消息
class server : public Message_Queue
{
public:server() : Message_Queue(path, proj_id){Message_Queue::Create();Message_Queue::Recvmessage();}~server(){Message_Queue::destroy();}
};// 获取者/发消息
class client : public Message_Queue
{
public:client() : Message_Queue(path, proj_id){Message_Queue::User();Message_Queue::Sendmessage();}~client() {}
};

四 基于责任链模式和消息队列对数据处理

1. 什么是责任链模式

责任链属于行为类设计模式,也就是程序在运行的时候每个模块之间都有任务和优先级,哪些任务完成或者不完成的结果要交给下一个节点(处理点),也可以不启动这个任务,就好比食堂打饭打完这个菜,后面的菜可打可不打。

2. 下面基于责任链模式来对消息队列获取的消息进行处理
  • 给消息加上时间戳
  • 把消息保存到文件
  • 如果文件行数超过一个范围则重命名

基于多态实现的责任链

  • 先分别创建3个任务类继承自剧中调度类
  • 因为都继承了这个类,在这个类定义一个指针,在把这个指针指向下一个节点的剧中调度类
  • 调用这个父类被继承的方法,就会构成多态转而去调用子类的方法进行数据处理

3. demo代码

Chai_of_responsibility.hpp

#include <iostream>
#include <memory>
#include <ctime>
#include <string>
#include <unistd.h>
#include <fstream>
#include <cstdio>class base
{
public:base() : _status(true) {}virtual ~base() {}virtual void hander(const std::string &message) = 0;public:void setnext(std::shared_ptr<base> next){_next = next;}void setstatusfalse(){_status = false;}void setstatustrue(){_status = true;}protected:std::shared_ptr<base> _next;bool _status;
};class format : public base
{
public:format() {}~format() {}void hander(const std::string &message) override{std::string str;if (_status == true){// 拼上时间戳/pidstr += std::to_string(time(nullptr));str += ' ';str += std::to_string(getpid()) + '\n';std::cout << "拼上 时间戳和pid" << std::endl;}str += message;std::cout << "str -> " << str << std::endl;if (_next != nullptr){_next->hander(str);}else{std::cout << "format hander over" << std::endl;}}
};const std::string default_path = "/home/CD/linux/message_queue/Demo_mes/test_path/";
const std::string default_name = "111.txt";class save : public base
{
public:save(const std::string &path = default_path, const std::string &name = default_name): _path(path), _name(name){}~save() {}void hander(const std::string &message) override{// 保存到文件std::ofstream os;if (_status == true){std::string str = _path + _name;os.open(str, std::ios::app);if (!os.is_open()){std::cout << "save file failed" << std::endl;}os << message;std::cout << "保存成功" << std::endl;}if (_next != nullptr){_next->hander(message);}else{std::cout << "save hander over" << std::endl;}os.close();}private:std::string _path;std::string _name;
};const int range = 5;
class backup : public base
{
public:backup(const std::string &path = default_path, const std::string &name = default_name): _path(path), _name(name){}~backup() {}private:bool line_range(const std::string &file_path){std::ifstream ifs;ifs.open(file_path);int cnt = 0;std::string s;while (std::getline(ifs, s)){cnt++;}ifs.close();return cnt > range;}public:void hander(const std::string &message) override{// 重命名/备份std::string str = _path + _name;std::string sname = _name + std::to_string(time(nullptr));if (_status == true){if (line_range(_path + _name)){if (rename(str.c_str(), (_path + sname).c_str()) == -1){std::cout << "rename failed" << std::endl;}std::cout << "重命名成功" << std::endl;}}if (_next != nullptr){_next->hander(str);}else{std::cout << "backup hander over" << std::endl;}}private:std::string _path;std::string _name;
};class enter
{
public:enter(){_fm = std::make_shared<format>();_sa = std::make_shared<save>();_bk = std::make_shared<backup>();_fm->setnext(_sa);_sa->setnext(_bk);_bk->setnext(nullptr);}void Choose(bool fm, bool sa, bool bk){fm ? _fm->setstatustrue() : _fm->setstatusfalse();sa ? _sa->setstatustrue() : _sa->setstatusfalse();bk ? _bk->setstatustrue() : _bk->setstatusfalse();}void run(const std::string messgae){_fm->hander(messgae);}~enter(){}private:std::shared_ptr<format> _fm;std::shared_ptr<save> _sa;std::shared_ptr<backup> _bk;
};

http://www.hkcw.cn/article/uBAqIJHlcn.shtml

相关文章

解决8080端口被占问题

文章目录 1. 提出问题2. 解决问题2.1 查看占用8080端口的进程2.2 杀死占用8080端口的进程2.3 测试问题是否已解决3. 实战小结1. 提出问题 运行Spring Boot项目,报错8080端口被占 2. 解决问题 2.1 查看占用8080端口的进程 执行命令:netstat -ano | findstr :8080 2.2 杀死占用…

【harbor】--基础使用

推送 不同的管理工具都有说明 以docker为例 # 第一步--打标签 docker tag SOURCE_IMAGE[:TAG] 192.168.121.201:801/haohao_fist/REPOSITORY[:TAG] # 第二步--推送 docker push 192.168.121.201:801/haohao_fist/REPOSITORY[:TAG]默认push推送为https push会失败 解决办法…

论文略读:To the Globe (TTG): Towards Language-Driven Guaranteed Travel Planning

2024 Emnlp demo 提出了TTG&#xff0c;一个演示系统&#xff0c;它能够接收用户的自然语言指令&#xff0c;并在几秒钟内生成最优的旅行行程 结合了大语言模型&#xff08;LLMs&#xff0c;Llama-3 70B&#xff09;与现有的符号求解器&#xff08;例如混合整数线性规划&#…

《操作系统真相还原》——初探保护模式

BUG 果不其然出现bug b 0x900在进入loader的时候打个断点&#xff0c;看看什么情况&#xff0c;好吧突然想起来&#xff0c;可能弄错镜像了 重新执行 正确 info gdt看一下相关信息

[yolov11改进系列]基于yolov11引入高效坐标注意力机制CoordAttention的python源码+训练源码

【CoordAttention介绍】 在轻量级网络上的研究表明&#xff0c;通道注意力会给模型带来比较显著的性能提升&#xff0c;但是通道注意力通常会忽略对生成空间选择性注意力图非常重要的位置信息。因此&#xff0c;新加坡国立大学的提出了一种为轻量级网络设计的新的注意力机制&a…

AI Agent的“搜索大脑“进化史:从Google API到智能搜索生态的技术变革

AI Agent搜索革命的时代背景 2025年agent速度发展之快似乎正在验证"2025年是agent元年"的说法&#xff0c;而作为agent最主要的应用工具之一(另外一个是coding)&#xff0c;搜索工具也正在呈现快速的发展趋势。Google在2024年12月推出Gemini Deep Research&#xff0…

以防长:辛瓦尔已死 这些人是下个目标

当地时间5月31日,以色列国防军发表声明,确认在以色列国防军与以色列国家安全总局(辛贝特)今年5月13日的联合行动中,以色列空军对巴勒斯坦伊斯兰抵抗运动(哈马斯)军事领导人穆罕默德辛瓦尔发动空袭并将其打死。以军声明称,此次空袭还打死了包括哈马斯拉法旅指挥官穆罕默…

8旬老人砍掉小区20年香樟树 私自修剪引发争议

家住浦东新区上南山水苑一期的业主王先生反映,小区内一群平均年龄七十多岁的老人以提高绿化环境为由,私自圈占公共绿化变为私人花园已有两年。物业和居委会多次劝阻无效,老人们的花越种越多。本周二上午,一位老人用网购的斧头和电锯砍伐了小花园内一棵20多年的香樟树。投诉…

代码随想录算法训练营第60期第五十三天打卡

大家好&#xff0c;我们今天来到了最后一章图论&#xff0c;其实图论比较难&#xff0c;涉及的算法也比较多&#xff0c;今天比较重要的就是深度优先搜索与广度优先搜索&#xff0c;后面的迪杰斯特拉算法等算法在我们求最短路都会涉及到&#xff0c;还有最近公共祖先&#xff0…

【Bluedriod】蓝牙协议栈GD模块(GD_SHIM_MODULE)启动机制及源码解析

本文深入剖析Android蓝牙协议栈中GD模块的启动机制&#xff0c;从模块生命周期管理、状态转换、异步初始化&#xff0c;到核心组件&#xff08;HCI层、协议栈管理器、广播/扫描/测距模块&#xff09;的协同运作。通过源码分析揭示蓝牙协议栈如何通过分层设计实现硬件抽象化、事…

threejsPBR材质与纹理贴图

1. PBR材质简介 本节课没有具体的代码&#xff0c;就是给大家科普一下PBR材质&#xff0c;所谓PBR就是&#xff0c;基于物理的渲染(physically-based rendering)。 Three.js提供了两个PBR材质相关的APIMeshStandardMaterial和MeshPhysicalMaterial,MeshPhysicalMaterial是Mes…

Leetcode 3231. 要删除的递增子序列的最小数量

1.题目基本信息 1.1.题目描述 给定一个整数数组 nums&#xff0c;你可以执行任意次下面的操作&#xff1a; 从数组删除一个 严格递增 的 子序列。 您的任务是找到使数组为 空 所需的 最小 操作数。 1.2.题目地址 https://leetcode.cn/problems/minimum-number-of-increas…

【SpringBoot实战】优雅关闭服务

文章目录 一、什么是优雅关闭&#xff1f;二、优雅关闭的核心步骤三、SpringBoot优雅关闭实现四、关键注意事项1. 超时时间必须配置2. 信号支持局限性3. 特殊请求处理 五、底层实现原理六、总结 一、什么是优雅关闭&#xff1f; 优雅关闭&#xff08;Graceful Shutdown&#x…

Redis:功能特性和应用场景

&#x1f308; 个人主页&#xff1a;Zfox_ &#x1f525; 系列专栏&#xff1a;Redis 本篇开始对于 Redis 进行正式介绍和学习 &#x1f525; 认识 Redis 在开始 Redis 学习前&#xff0c;要先认识一下 Redis Redis 的设计&#xff0c;是想要把它当做是一个数据库&#xff…

etcd详解

一、核心特性二、架构原理三、应用场景四、运维实践五、常见问题与解决方案六、与 ZooKeeper 和 Consul 的对比总结 etcd 是一个高可用的分布式键值存储系统&#xff0c;广泛应用于云原生领域&#xff0c;尤其作为 Kubernetes 的核心组件&#xff0c;用于存储集群的配置、状态和…

CTFHub-RCE 命令注入-综合练习

观察源代码 代码里面可以发现过滤了运算符、目录分隔符、分号、空格还有一些关键字也被过滤了 判断是Windows还是Linux 源代码中有 ping -c 4 说明是Linux 查看有哪些文件 用换行符的url值&#xff08;%0a&#xff09;代替分号注意&#xff1a;在url中输入 ?ip127.0.0.1%0a#…

网络编程1_网络编程引入

为什么需要网络编程&#xff1f; 用户再在浏览器中&#xff0c;打开在线视频资源等等&#xff0c;实质上说通过网络&#xff0c;获取到从网络上传输过来的一个资源。 与打开本地的文件类似&#xff0c;只是这个文件的来源是网络。相比本地资源来说&#xff0c;网络提供了更为…

性能优化 - 理论篇:性能优化的七类技术手段

文章目录 Pre引言性能优化的七类技术手段性能优化策略一览表1. 复用优化2. 计算优化2.1 并行执行2.2 变同步为异步2.3 惰性加载 3. 结果集优化3.1 数据格式与协议选择3.2 字段精简与按需返回3.3 批量处理与分页3.4 索引与位图加速 4. 资源冲突优化4.1 锁的分类与特点4.2 无锁与…

Android之ListView

1&#xff1a;简单列表(ArrayAdapter) 1&#xff1a;运行的结果&#xff1a; 2&#xff1a;首先在MyListView里面创建一个按钮&#xff0c;点击的时候进行跳转。 这里让我吃惊的是&#xff0c;Button里面可以直接设置onClick .java里面的方法。 也即是点击这个按钮之后就会去…

Unity程序集

对于Unity的程序集&#xff0c;具体内容可以参考Unity官方文档&#xff0c;程序集定义 - 预定义程序集 比如Unity的默认程序集&#xff0c;Assembly-CSharp.dll&#xff0c;还有其他的比如 Assembly-CSharp-Editor.dll&#xff0c;Assembly-CSharp-firstpass.dll 没有指定或…