目录
初始化信号量
销毁信号量
等待信号量
发布信号量
基于环形队列的生产消费模型
代码实现
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。本质就是一个计数器,是对特定资源的预定机制,本身相比于基本类型,POSIX信号是原子的,POSIX可以用于线程间同步,信号量把对临界资源是否存在,就绪等的条件,以原子性的形式呈现在临界资源前就判断了。
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:pshared:0表⽰线程间共享,⾮零表⽰进程间共享value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
//申请成功执行后面的代码,申请失败阻塞挂起
发布信号量
功能:发布信号量,表⽰资源使⽤完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
基于环形队列的生产消费模型
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者
标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态
设计思路
- 约定一:空,生产者先行
- 约定二:满,消费者先行
- 约定三:生产者不能快过消费者,不能套一个圈以上,防止生产满了还生产
- 约定四:消费者不能超过生产者,防止没有生产好就消费
总结:只有不是同一个位置就能并发进行,用数组来模拟环形队列,开始没有东西,生产者先生产,消费者等待,等生产者生产了数据,唤醒消费者生产,此时开始并发,假如生产者快过消费者,只有判断下一个生产节点是否和消费者位置一致就进行等待,等消费者消费完唤醒,假如消费者快过生产者,只有消费者下一个节点是生产者处的位置时,此时没有数据就进入等待队列,等待唤醒,数据为空为满都是互斥
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
代码实现
单生产,单消费
Sem.hpp
#include <iostream>
#include <pthread.h>
#include <semaphore.h>const int defaultvalue = 5;
namespace SemModule
{class Sem{public:Sem(unsigned int sem_value = defaultvalue){sem_init(&_sem, 0, sem_value);}~Sem() {sem_destroy(&_sem);}void V(){int n=sem_post(&_sem);}void P(){int n=sem_wait(&_sem);}private:sem_t _sem;};
}
Main.cc
#include<iostream>
#include <iostream>
#include <pthread.h>
#include<unistd.h>
#include"RingQueue.hpp"
//消费void *consumer(void *args)
{RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);while(true){sleep(2);int t =0;rq->Pop(&t);std::cout<<"消费拿到一个数据:"<<t<<std::endl;}
}
//生产
void *productor(void *args)
{ int data=1;RingQueue<int> *rq = static_cast<RingQueue<int>*>(args);while(true){ std::cout<<"生产了一个任务"<<std::endl;rq->Equeue(data++);}
}int main()
{// 申请阻塞队列RingQueue<int> *rq = new RingQueue<int>();// 创建消费则模型pthread_t c[1], p[1];pthread_create(c, nullptr, consumer, rq);pthread_create(p, nullptr, productor, rq);pthread_join(c[0],nullptr);pthread_join(p[1],nullptr);return 0;
}
RingQueue.hpp
#include <vector>
#include <iostream>
#include "Sem.hpp"using namespace SemModule;
static const int gcap = 5;template <typename T>
class RingQueue
{
public:RingQueue(int cap = gcap): _rq(cap), _cap(cap), _blank_sem(cap), _p_step(0), _data_sem(0), _c_step(0){}~RingQueue() {}// 生产者调用void Equeue(const T &in){// 1.申请信号量,空位置信号量_blank_sem.P();//2.生产_rq[_p_step]=in;//3.更新下标,并维持环形特性_p_step++;_p_step%=_cap;//4.通知_data_sem.V();}// 消费者调用void Pop(T *out){//1.申请信号量,数据信号量_data_sem.P();//2.消费*out=_rq[_c_step];//3.更新下标,并维持环形特性_c_step++;_c_step%=_cap;_blank_sem.V();}private:std::vector<T> _rq; // 环形队列int _cap; // 容量Sem _blank_sem; // 生产者 ->空位置int _p_step; // 生产者下标位置Sem _data_sem; // 消费者 ->空数据int _c_step; //消费者下标位置
};
多生产,多消费
Main.cc
#include<iostream>
#include <iostream>
#include <pthread.h>
#include<unistd.h>
#include"RingQueue.hpp"
//消费
struct threaddata
{
RingQueue<int>*rq;
std::string name;
};void *consumer(void *args)
{threaddata *td=static_cast<threaddata*>(args);while(true){sleep(2);int t =0;td->rq->Pop(&t);std::cout<<td->name<<"消费拿到一个数据:"<<t<<std::endl;}
}
//生产
void *productor(void *args)
{ threaddata *td=static_cast<threaddata*>(args);int data=1;while(true){ sleep(1);std::cout<<td->name<<"生产了一个任务"<<data<<std::endl;td->rq->Equeue(data++);}
}int main()
{ RingQueue<int> *rq = new RingQueue<int>();// 创建消费则模型pthread_t c[2], p[3];threaddata *td1=new threaddata();td1->name="cthread-1";td1->rq=rq;pthread_create(c, nullptr, consumer, td1);threaddata *td2=new threaddata();td2->name="cthread-2";td2->rq=rq;pthread_create(c+1, nullptr, consumer, td2);threaddata *td3=new threaddata();td3->name="pthread-3";td3->rq=rq;pthread_create(p, nullptr, productor, td3);threaddata *td4=new threaddata();td4->name="pthread-4";td4->rq=rq;pthread_create(p+1, nullptr, productor, td4);threaddata *td5=new threaddata();td5->name="pthread-5";td5->rq=rq;pthread_create(p+2, nullptr, productor, td5);pthread_join(c[0],nullptr);pthread_join(c[1],nullptr);pthread_join(p[0],nullptr);pthread_join(p[1],nullptr);pthread_join(p[2],nullptr);return 0;
}
RInfQueue.hpp
#include <vector>
#include <iostream>
#include "Sem.hpp"
#include "Mutex.hpp"using namespace SemModule;
using namespace MutexModule;
static const int gcap = 5;template <typename T>
class RingQueue
{
public:RingQueue(int cap = gcap): _rq(cap), _cap(cap), _blank_sem(cap), _p_step(0), _data_sem(0), _c_step(0){}~RingQueue() {}// 生产者调用void Equeue(const T &in){// 1.申请信号量,空位置信号量_blank_sem.P(); // 先申请信号量在申请锁效率高一点,先把信号量顺序依次分好,然后分到的再去申请锁,在申请锁的同时别的线程也可以去申请信号量,如果先// 申请锁在申请信号量就慢了{LockGuard lockguard(_pmutex);// 2.生产_rq[_p_step] = in;// 3.更新下标,并维持环形特性_p_step++;_p_step %= _cap;}// 4.通知_data_sem.V();}// 消费者调用void Pop(T *out){// 1.申请信号量,数据信号量_data_sem.P();{LockGuard lockguard(_cmutex);// 2.消费*out = _rq[_c_step];// 3.更新下标,并维持环形特性_c_step++;_c_step %= _cap;}_blank_sem.V();}private:std::vector<T> _rq; // 环形队列int _cap; // 容量Sem _blank_sem; // 生产者 ->空位置int _p_step; // 生产者下标位置Sem _data_sem; // 消费者 ->空数据int _c_step; // 消费者下标位置// 维护多生产,多消费Mutex _cmutex;Mutex _pmutex;
};
Mutex.cc
#include <pthread.h>
#include <iostream>
namespace MutexModule
{class Mutex{public:Mutex(){pthread_mutex_init(&_mutex, nullptr);}void Lock(){int n = pthread_mutex_lock(&_mutex);(void)n;}void Unlock(){int n = pthread_mutex_unlock(&_mutex);(void)n;}~Mutex(){pthread_mutex_destroy(&_mutex);}pthread_mutex_t *get(){return &_mutex;}private:pthread_mutex_t _mutex;};class LockGuard{public:LockGuard(Mutex &mutex):_mutex(mutex){_mutex.Lock();}~LockGuard(){_mutex.Unlock();}private:Mutex &_mutex;};
}
Sem.cpp
#include <iostream>
#include <pthread.h>
#include <semaphore.h>const int defaultvalue = 5;
namespace SemModule
{class Sem{public:Sem(unsigned int sem_value = defaultvalue){sem_init(&_sem, 0, sem_value);}~Sem() {sem_destroy(&_sem);}void V(){int n=sem_post(&_sem);}void P(){int n=sem_wait(&_sem);}private:sem_t _sem;};
}