这篇文章我们来认识一下线程间通信的一种手段—共享内存,共享内存由于其特殊性—共享内存是存在与每个进程的地址空间中的,通俗点就是这部分数据是对每个进程可见的,这样每个进程都可以在一定条件下直接操作共享内存中的数据,避免了数据的复制等耗时的操作,这也是共享内存比其他几种IPC机制(信号量、管道、消息队列等)的通信方式效率高的原因,但是共享内存也有缺点,那就是需要独立实现消息的同步机制。
本文将完成一个基础的共享内存类模板,包含在DeltaCV项目中,欢迎star,fork。
概述
在boost/Interprocess中,提供了很多关于操作系统底层的进程间通信的抽象层,它使用C++将操作系统的底层接口进行封装,并消除了不同操作系统之间各种各样的接口带来的开发困难等问题。
共享内存
先介绍一种最普通的共享内存创建方式,boost::interprocess::shared_memory_object类。
#include <boost/interprocess/shared_memory_object.hpp>
// boost::interprocess::open_or_create: 第一个参数说明了当前的操作是什么,直接可以根据字面意思猜测到,open_or_create指如果尝试打开名字为DeltaCV的共享内存失败时,就创建它,当然也有create_only,open_only操作。
// "DeltaCV": 表明当前我们操作的共享内存的名字为DeltaCV。
// boost::interprocess::read_write: 所有进程对此共享内存的操作权限(暂且可以这样认为,感觉不太严谨),read_write即可读写,同时还有read_only供选择。
boost::interprocess::shared_memory_object shm(boost::interprocess::open_or_create, "DeltaCV", boost::interprocess::read_write);
共享内存创建后大小为0,我们使用truncate()方法来为共享内存设定大小,单位为字节。
// 为刚才创建的共享内存开辟1024个字节大小的空间。
shm.truncate(1024);
然而刚才我们创建的共享内存虽然对所有进程可见,但是现在还不能直接操作,因为每个进程都有自己独立的地址空间,我们还需要将上面的共享内存映射进当前进程的地址空间中,可以使用mapped_region()方法完成。
#include <boost/interprocess/mapped_region.hpp>
//shm: 传入上文实例化的共享内存对象
//boost::interprocess::read_write: 当前进程对此共享内存的操作权限
/*
函数的原型如下
mapped_region region(
shm, //Memory-mappable object
mode, //Access mode
start, //Offset from the beginning of shm
end //Length of the region
);
当你省略后面两个参数时,将默认映射整个共享内存对象
*/
boost::interprocess::mapped_region region(shm, boost::interprocess::read_write);
// 共享内存的其实地址
std::cout<<region.get_address()<<std::endl;
// 共享内存块的大小
std::cout<<region.get_size()<<std::endl;
接下来就是开始向共享内存中存数据了,跟我们普通的指针操作一样.
// 先将地址的指针强制转换成int指针,因为我们要储存的是int变量
int *n = static_cast<int*>(region.get_address());
// 和普通的指针操作一样,这里讲1存放进共享内存中
*n = 1;
// 如果在其它程序中我们已经存放过数据了,而想要取数据,则*n就是我们要取的数据了
那如何删除我们创建的共享内存呢?多数linux系统中,如果不主动删除共享内存,它会一直存在到系统重启.
//返回值指示了是否删除成功
bool removed = boost::interprocess::shared_memory_object::remove("DeltaCV");
托管共享内存
然而上述的方式几乎不会用到,因为它每次需要按单个字符的形式读写内存,所以绝大多数情况下我们会使用托管共享内存方式,它会以内存申请的方式对共享内存对象进行初始化.
#include <boost/interprocess/managed_shared_memory.hpp>
// boost::interprocess::open_or_create: 含义同上
// "Highscore": 共享内存的名称
// 1024: 共享内存的大小
boost::interprocess::managed_shared_memory managed_shm(boost::interprocess::open_or_create, "DeltaCV", 1024);
// 创建了一个包含10个元素的int型数组,起名为"Integer",且值初始化为1,此时n为数组首元素的地址
int *n = managed_shm.construct<int>("Integer")[10](1); //当"Integer"之前不存在
std::pair<int*,size_t > n = managed_shm.find<int>("Integer")[10](1); //当"Integer"已经存在时,我们使用find来找到这个数据,可见此时返回的是pair类型数据,first保存的是数据首元素地址,second保存的是整个数据的长度.
删除托管共享内存
boost::interprocess::shared_memory_object::remove("DeltaCV");
互斥对象
既然共享内存对所有进程都可见,那么怎么保证共享内存的读写不会相互干扰呢?那就是使用互斥对象,当共享内存被使用时,互斥对象被占用,其他进程想要操作同一共享内存时,就需要等待互斥对象释放,这样就保证了共享内存的正确读写,类似于多线程中的锁.
互斥对象的声明
#include <boost/interprocess/sync/interprocess_mutex.hpp>
// 声明了一个名为"mtx"互斥对象,使用的是find_or_construct,含义同上
boost::interprocess::interprocess_mutex *mtx = managed_shm.find_or_construct<boost::interprocess::interprocess_mutex>("mtx")();
// 上锁,也就是占用互斥对象,一般每个共享变量对应一个互斥对象
mtx->lock();
/*
TODO SOMETHING
*/
//解锁
mtx->unlock();
DeltaCV中的共享内存类模板
至此,关于boost下的共享内存编程就简单的介绍一下,应该能满足日常的使用了,下面我将介绍我在DeltaCV中封装的共享内存类模板,整体框架类似与ROS中的topic形式,提供发布者和订阅者.完整代码见DeltaCV.
发布器
template <typename T1>
class shm_publisher{
public:
shm_publisher(const char* node_name, const int RECEIVE_DATA_LENGTH)
:shm_name(string(node_name)), //创建的
per_data_bytes(sizeof(T1)),
data_length(RECEIVE_DATA_LENGTH+1)
{
lock_name = string(node_name)+"_lock"; //定义互斥变量的名称
data_name = string(node_name)+"_data"; //定义数据的名称
update_name = string(node_name)+"_update"; //更新标志位
boost::interprocess::shared_memory_object::remove(shm_name.c_str()); //首先检查内存是否被释放
boost::interprocess::named_mutex::remove(lock_name.c_str()); //检查互斥变量是否被释放
//托管共享内存
managed_shm = new boost::interprocess::managed_shared_memory(
boost::interprocess::create_only,
shm_name.c_str(),
per_data_bytes*data_length + 4 + 1024);
// 互斥变量
named_mtx = new boost::interprocess::named_mutex(
boost::interprocess::create_only,
lock_name.c_str());
// 变量
user_data = managed_shm->construct<T1>(data_name.c_str())[data_length](0);
update_flag = managed_shm->construct<int>(update_name.c_str())[1](0);
}
~shm_publisher()
{
boost::interprocess::shared_memory_object::remove(shm_name.c_str());
boost::interprocess::named_mutex::remove(lock_name.c_str());
managed_shm->destroy<T1>(data_name.c_str());
managed_shm->destroy<int>(update_name.c_str());
}
// 发布函数如下:
void broadcast(T1* data)
{
named_mtx->lock();
memcpy(user_data, data, per_data_bytes*data_length);
*update_flag = 1;
named_mtx->unlock();
}
private:
T1* user_data;
int* update_flag;
std::string shm_name;
std::string lock_name;
std::string data_name;
std::string update_name;
boost::interprocess::managed_shared_memory* managed_shm;
boost::interprocess::named_mutex* named_mtx;
int per_data_bytes,data_length;
};
订阅器
template <typename T2>
class shm_subscriber{
public:
shm_subscriber(const char* node_name)
:shm_name(string(node_name)),
per_data_bytes(sizeof(T2))
{
lock_name = string(node_name) + "_lock"; //这里的命名规则与发布器对应
data_name = string(node_name) + "_data";
update_name = string(node_name)+"_update";
//托管共享内存
managed_shm = new boost::interprocess::managed_shared_memory(
boost::interprocess::open_only,
shm_name.c_str());
//获得互斥对象
named_mtx = new boost::interprocess::named_mutex(
boost::interprocess::open_only,
lock_name.c_str());
//找到两个变量bianlaing
user_data = managed_shm->find<T2>(data_name.c_str());
update_flag = managed_shm->find<int>(update_name.c_str());
}
// 返回值表示当前取得值是否更新
bool get(T2* data)
{
named_mtx->lock();
if(update_flag.first[0]==1) //检查标志位是否置1
{
update_flag.first[0]=0;
memcpy(data, user_data.first,user_data.second*per_data_bytes);
named_mtx->unlock();
return true;
} else {
named_mtx->unlock();
return false;
}
}
private:
std::pair<T2*,size_t > user_data;
std::pair<int*,size_t > update_flag;
std::string shm_name;
std::string lock_name;
std::string data_name;
std::string update_name;
uint8_t per_data_bytes;
boost::interprocess::managed_shared_memory* managed_shm;
boost::interprocess::named_mutex* named_mtx;
};
小技巧
代码中,我额外添加了一个标志位update_flag,用来指示当前取到的数据是否是最新值,因为只要发布器那边存入一次数据,update_flag这个变量就置为1,而订阅器每取一次数据,就讲update_flag置0,所以当订阅器的获取频率超过发布器时,订阅器取到的数据一部分是未更新的数据,则订阅器能通过get()的返回值来选择是否使用本次获取的数据(若返回值为true,则为最新值,若为false,则说明自上次订阅器取过值后到这次取值之间,发布器没有存入过新的数据).
演示
分别新建两个main函数,代表两个进程,其中一个作为发布器,每5s发布一次数据,另外一个作为订阅器,每1s获取一次数据.
// main1.cpp
#include <iostream>
#include "shm.hpp"
int a[2] = {4,5};
int main() {
deltaCV::shm_publisher<int> pub("test",2);
while(1)
{
p.broadcast(a);
sleep(5);
}
return 0;
}
// main2.cpp
#include <iostream>
#include "shm.hpp"
int a[2];
int main() {
deltaCV::shm_subscriber<int> sub("test");
while(1)
{
bool res = sub.get(a);
cout<<"update: "<<res<<" ,a[0]: "<<a[0]<<" ,a[1]: "<<a[1]<<endl;
sleep(1);
}
return 0;
}
最终输出为:(可以看到5s内,订阅器有4次获取的是未更新的值)
update: 1 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 1 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 1 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
update: 0 ,a[0]: 4 ,a[1]: 5
参考:http://zh.highscore.de/cpp/boost/interprocesscommunication.html