进程间通信目的
数据传输、资源共享、通知事件、进程控制
- 进程间通信的基本方法:使两个进程“看见”同一个文件
进程间通信的主要分类
管道
- 匿名管道 pipe
- 命名管道
system IPC
- 消息队列(数据传输)
- 共享内存(数据共享)
- 信号量 (事件通知)
POSIX IPC (提供了一套进程间通信的方式)
- 消息队列
- 共享内存
- 互斥量
- 条件变量
- 信号量
- 读写锁
管道
最古老的进程间通信的形式
把从一个进程连接到另一个进程的一个数据流称为一个“管道”
pipe函数
功能:创建一无名管道
int pipe(int fd[2]);
参数:
fd:文件描述符数组 fd[0] :读端 fd[1] :写端
返回值:成功返回0,失败返回错误码
代码实现:从键盘读取数据,写入管道,读取管道,写到屏幕1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
int main(void)
{
int fds[2];
char buf[100];
int len;
if(pipe(fds) == -1){
perror("make pipe"),exit(1);
}
//键盘读取
while(fgets(buf,100,stdin))
{
len = strlen(buf);
//write into pipe
if(write(fds[1],buf,len) != len)
{
perror("write to pipe");
break;
}
memset(buf,0x00,sizeof(buf));
//管道读取
if((len = read(fds[0],buf,100)) == -1)
{
perror("read from pipe");
break;
//向屏幕写入
if(write(1,buf,len) != len)
{
perror("write to stdout");
break;
}
}
}
}
mkfifo函数1
2
3
4
5//功能:创建管道文件
int mkfifo(const char *name , mode_t mode)
//参数:fd:文件描述符数组 fd[0] :读端 fd[1] :写端
//返回值:成功返回0.失败返回错误码
打开管道文件的函数1
2
3int fd=open(name, O_RDONLY); //读
int fd=open(name, O_WDONLY); //写
//read/write/ 语义和匿名管道一样
消息队列
msgget 函数1
2
3
4
5
6//功能:创建和访问一个消息队列
int msgget (key_t key, //消息队列名字,相当于文件名
int flags ); // 创建:IPC_CREAT| 0644(权限) 打开:0
返回值:成功:消息队列的id, 相当于文件描述符, 失败返回-1
系统中总共能创建多少个消息队列?
- msgmni
msgmni 定义了系统范围内的消息队列上限。与信号量一样,消息队列也拥有一个相关
的标识符。在系统初始化阶段里,内核创建一个指向消息队列标识符结构的指针数组。该
数组的项数由 msgmni确定。对于每个消息队列,Linux 内核为标识符分配44B,为消息队
列数据结构分配 96B。为了获得更多的消息队列资源,可以动态增加 msgmni 取值。和信
号量一样,消息队列标识符的最大数目也受限于IPCMNI。msgmni的默认上限为 16B,这
可能不足以保证一些大型数据库应用平滑地运行。如果在系统上要运行数据库应用的话,
推荐默认上限值是 128B。 msgmax
msgmax 限制进程可以发送的消息长度。该参数由 Msgsnd()函数加以应用。如果待发
送消息的长度超过该值,则返回一个错误。该参数可以在运行时调整msgmnb
msgmnb 确定一个消息队列的容量。该参数的取值存储在消息队列标识符结构的某个
域中,用于确定是否存在着对新消息进行排队的空间。msgmnb 值可以动态修改,默认为
16384。修改其取值会影响到所有新的消息队列的容量。用户可以通过 Msgctl()系统调用来
增加现有消息队列的容量- 修改
永久修改
root用户下修改/etc/sysctl.conf 文件。
更改的方法: 在配置文件/etc/sysctl.conf中加上 kernel.msgmax=value kernel.msgmni=value kernel.msgmnb=value 然后运行sysctl -p 即可进行修改1
2
3max queues system wide = // msgmni
max size of message (bytes) = //msgmax
default max size of queue (bytes) = //msgmnb
临时修改
root用户下sysctl -w kernel.msgmnb= 1048576
相关函数
msgsnd函数1
2
3
4
5
6
7
8
9
10
11
12
13
14//功能:往消息队列中发送消息
int msgsnd(int id, //msgget的返回值
const void * msgp, //要发送的消息在哪里
size_len, //消息的字节数,不包括channel的大小
int msgflg); //0
//返回值:成功:0,失败: -1
struct msgbuf {
long mtype(channel); //消息类型(通道号),必须>=1
char mtext[1]; //写上自己的消息的数据类型
};
msgrcv函数1
2
3
4
5
6
7
8
9
10//功能:从消息队列中取数据
size_t msgrcv(int id,
const void * msgp, //取出来的消息放哪里
size_t len, //装消息的地方的大小,不包括类型
long mtype , //取那个类型消息
int flag); //0
//返回值:成功:实际读取了多少个字节,失败:0
共享内存—— 最快的进程间通信方式
共享内存(shared memory):是linux下的多进程之间的通信方法,这种方法通常用于一个程序的多进程间通信,实际上多个程序间也可以通过共享内存来传递信息。共享内存指在多处理器的计算机系统中,可以被不同中央处理器(CPU)访问的大容量内存。由于多个CPU需要快速访问存储器,这样就要对存储器进行缓存(Cache)。共享内存是存在于内核级别的一种资源,在shell中可以使用ipcs命令来查看当前系统IPC中的状态,在文件系统/proc目录下有对其描述的相应文件。共享内存相比其他几种方式有着更方便的数据控制能力,数据在读写过程中会更透明。
shmget函数1
2
3
4
5
6
7//功能:创建或打开共享内存
int shmget(key_t key,
size_t size, //共享内存段大小
int shmflg); //创建 IPC_CREAT |0644 ,打开 0
//返回值:失败:-1 , 成功:返回一个有效的共享内存标识符
shmat 函数
1 |
|
shmdt 函数1
2
3
4
5
6//功能:卸载掉共享内存段
int shmdt(const void *shmaddr);
//删除共享内存:shmc
int shmctl(int id,
int cmd , //IPC_RMID,
NULL);
shmctl函数
共享内存的控制函数1
2
3
4
5
6
7
8int shmctl(int shm_id, int cmd, struct shmid_ds *buf);
shmid_ds结构至少包含以下成员:
struct shmid_ds {
uid_t shm_perm.uid;
uid_t shm_perm.gid;
mode_t shm_perm.mode;
}
参数:1
2shm_id : 是shmget返回的共享内存标识符。
command: 是要采取的动作,
它可以取3个值:1
2
3
4IPC_STAT 把shmid_ds结构中的数据设置为共享内存的当前关联值
IPC_SET 如果进程有足够的权限,
就把共享内存的当前关联值设置为shmid_ds结构中给出的值
IPC_RMID 删除共享内存段
buf : 是一个指针,
包含共享内存模式和访问权限的结构。
返回值:
成功时,返回0,
失败时,返回-1.
comm.h代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
int createShm(int size);
int getShm();
int destoryShm(int shmid);
comm.c代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41int CommShm(int size,int flags)
{
int key=ftok(PATHNAME,PROJ_ID);
if(key<0)
{
perror("ftok");
return -1;
}
int shmid=shmget(key,size,flags);
if(shmid<0)
{
perror("shmid");
return -2;
}
return shmid;
}
int CreateShm(int size)
{
return CommShm(size, IPC_CREAT|IPC_EXCL|0666);
}
int GetShm()
{
return CommShm(0, IPC_CREAT);
}
int DestoryShm(int shmid)
{
if(shmctl(shmid,IPC_RMID,NULL)<0)
{
perror("shmctl");
return -1;
}
return 0;
}
server.c代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20int main()
{
int count = 0;
int shmid = CreateShm(4096);
sleep(5);
char * buf = shmat(shmid, NULL, 0);
while(count<4096)
{
buf[count] = 'A'+count%26;
sleep(2);
count++;
buf[count] = '\0';
}
shmdt(buf);
sleep(5);
DestoryShm(shmid);
return 0;
}
client.c 代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
int main()
{
int shmid = GetShm();
sleep(5);
char* buf = shmat(shmid, NULL, 0);
while(1)
{
printf("%s\n", buf);
sleep(2);
}
shmdt(buf);
return 0;
}
信号量(集)
(本质就是计数器)
1.什么是信号量
信号量是一种特殊的变量,访问具有原子性。
即只允许对它进行两个操作:
1)等待信号量
当信号量值为0时,程序等待;当信号量值大于0时,信号量减1,程序继续运行。
2)发送信号量
将信号量值加1。
我们使用信号量,来解决进程或线程间共享资源引发的同步问题。
semget函数1
2
3
4//功能:创建或打开信号量
int semget(key_t key ,
int nsems ,//信号量集中信号量的个数
int flags); //打开0,创建IPC_CREAT|0644
semct函数1
2
3
4
5
6
7
8
9
10//功能:设置信号量初值
int semct(semid,
int semnum, //信号量集中的第几个信号量
int cmd, // SETVAL
su); //信号量初值
union semun
{
int val ; //value for SETVAL
};
semct函数1
2
3
4
5//功能:查看信号量的值
int semct(semid,
int semnum, //信号量集中的第几个信号量
int cmd, // GETVAL
0); //信号量初值
P,V操作
key_t键和ftok函数
函数ftok把一个已存在的路径名和一个整数标识符转换成一个key_t值,称为IPC键值(也称IPC key键值)。ftok函数原型及说明如下:
ftok(把一个已存在的路径名和一个整数标识符转换成IPC键值)所需头文件
#include <sys/types.h>
#include <sys/ipc.h>
- 函数说明
把从pathname导出的信息与id的低序8位组合成一个整数IPC键
- 函数原型
key_t ftok(const char * pathname, int proj_id)
- 函数传入值
pathname:指定的文件,此文件必须存在且可存取
proj_id:计划代号(project ID)
- 函数返回值
成功:返回key_t值(即IPC 键值)
出错:-1,错误原因存于error中
- 附加说明
key_t一般为32位的int型的重定义
ftok的典型实现是调用stat函数,然后组合以下三个值:
① pathname所在的文件系统的信息(stat结构的st_dev成员)。
② 该文件在本文件系统内的索引节点号(stat结构的st_ino成员)。
③ proj_id的低序8位(不能为0)。
上述三个值的组合产生一个32位键。
PV操作与信号量的处理相关,P表示通过的意思,V表示释放的意思。
ftok函数,先不去了解它的作用来先说说为什么要用它,共享内存,消息队列,信号量他们三个都是找一个中间介质来进行通信的,这种介质多的是。就是怎么区分开来,就像唯一一个身份证来区分人一样。只要唯一就行,就想起来了文件的设备编号和节点,它是唯一的,但是直接用它来做识别好像不太好,不过可以用它来产生一个号。ftok()就出场了。
ftok函数在一般的UNIX中,通常是将文件的索引节点取出,然后在前面加上子序号就得到key_t的值。1
2
3
4
5
6
7
8
9semop(int semid,
struct sembuf sb[],
int len);
struct sembuf
{
short sem_num , //信号量的下标
short sem_op , //1 v ,1 p
short sem_flg // 0
};
代码:
comm.h1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
union semun{
int val;
struct semid_ds * buf;
unsigned short * array;
struct seminfo * _buf;
};
int createSemSet(int nums);
int initSem(int semid,int nums,int initVal);
int P(int semid,int who);
int V(int semid,int who);
int destroySemSet(int semid);
comm.c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
static int commSemSet(int nums,int flags){
key_t _key = ftok(PATHNAME,PROJ_ID);//将文件路径设为当前文件
if(_key < 0){
perror("ftok");
return -1;
}
//创建和访问一个信号量集
int semid = semget(_key,nums,flags);//key:信号集的名字,nums:信号集中信号量的个数,flags:权限标志
if(semid < 0){
perror("semget");
return -2;
}
return semid;
}
int createSemSet(int nums){
return commSemSet(nums,IPC_CREAT|IPC_EXCL|0666);
}
int getSemSet(int nums){
return commSemSet(nums,IPC_CREAT);
}
int initSem(int semid,int nums,int initVal){
union semun _un;
_un.val = initVal;
//控制信号量集
if(semctl(semid,nums,SETVAL,_un) < 0){//semid:由semget返回的信号集标识码,nums:信号量集的序号
//SETVAL:设置信号量集中的信号量的计数器
perror("semctl");
return -1;
}
return 0;
}
static int commPV(int semid,int who,int op){
struct sembuf _sf;
_sf.sem_num = who;//信号量的编号
_sf.sem_op = op;//信号量一次PV操作时加减的数值
_sf.sem_flg = 0;
//创建和访问一个信号量集
if(semop(semid,&_sf,1) < 0){//semid:信号量标识码,&sf:sembuf结构体 ,1:信号量的个数
perror("semop");
return -1;
}
return 0;
}
int P(int semid,int who){
return commPV(semid,who,-1);
}
int V(int semid,int who){
return commPV(semid,who,1);
}
int destroySemSet(int semid){
if(semctl(semid ,0,IPC_RMID)<0){
perror("semctl");
return -1;
}
}
test.c1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
int main(){
int semid = createSemSet(1);
initSem(semid,0,1);
pid_t id = fork();
if(id == 0){
//child
int _semid = getSemSet(0);
while(1){
P(_semid,0);
printf("A");
fflush(stdout);
usleep(123456);
printf("A ");
fflush(stdout);
usleep(321456);
V(_semid,0);
}
}else{//father
while(1){
P(semid,0);
printf("B");
fflush(stdout);
usleep(223456);
printf("B ");
fflush(stdout);
usleep(121456);
V(semid,0);
}
wait(NULL);
}
destroySemSet(semid);
return 0;
}
ipcs小结
ipcs -q 列出所有的消息队列
ipcs -m列出所有的共享内存
ipcs -s列出所有的信号量
ipcrm -q 删除指定的消息队列
ipcrm -m删除指定的共享内存
ipcrm -s删除指定的信号量
同步与互斥
互斥:
由于各进程要求共享资源,而且有些资源需要互斥使用,因此各进程间竞争使用这些资源,进程的这种关系为进程的互斥,理解起来就是一个用这份资源的话另一个就不能用,这两者之间就是互斥关系
同步:
指的是多个进程协同完成同一项任务
生产者消费者问题:
也叫缓存绑定问题,是一个经典的多进程同步问题,分为单生产者单消费者和多生产者多消费者两种情况。
生产者顾名思义,是一个产生数据的进程,而消费者是一个读出数据的进程。
对于单生产者单消费者模式,生产者产生一块数据后,放进缓冲池中,这时消费者看到缓冲池中有数据就可以来拿,如果生产者生产比消费者消费的快,消费者拿完缓冲池中所有数据之后就应该开始等待,等到有数据之后消费者才来消费,如果生产者生产比消费者消费的快,很快就将缓冲池装满了,生产者这时就不再产生数据,等消费者消费一块数据然后生产者再向里面装一块数据。
对于多生产者多消费者问题,就是在生产者之间和消费者之间添加互斥关系,其它和单生产者单消费者相同。
或许概念理解起来比较抽象,不过我们可以类比:我们将缓冲池类比成超市,而超市的供货商就是生产者,而我们这些买东西的人就是消费者,单消费者单生产者问题就可以类比为,这家超市只卖一种东西,这里就假设是方便面吧,这家超市小了一点只有一个货架,而且老板脾气比较古怪,只卖一种口味的,而这家超市是方圆百里唯一一家卖东西的,而只有一个人买东西哎,当供货商没有往货架上放方便面的时候即使我们再想吃也没办法,我们只能等,而货架的大小是固定的,供货商如果一次性发货太多了,只能把货架塞满,剩下的只能别人买走一包,然后往货架上再新添一包。多生产者多消费者问题就可以类比为,有多个供货商不过还是这家超市,货架还是那么大,多个供货商都往货架上放方便面,但是货架的空间用一个就少一个意味着一个供销商在这个位置放了,另一个供销商就不能往这个位置放了,只能考虑下一个位置是不是空的,是空的即可以放,不是空的就继续考虑下一个位置,如果放满了就等待,等到有人买走了再次有空位置了才可以放,对于每一个位置,各个供销商之间的关系就是一种互斥关系,而这时买东西的人也不是一个了,有很多个,但对于每个位置,你拿了别人就不能拿了,因为那个位置已经空了,只能看看下个位置有没有你要的方便面,没有的话再向后找,如果找到头也没有就只能等着有某一家供销商放上去了然后你才能拿。对于我们这些消费者针对于每一个位置上的方便面,我们之间存在的也是互斥关系。