--MartyKalin
本篇是Linux下进程间通讯(IPC)系列的第二篇文章。第一篇文章聚焦于通过共享文件和共享显存段这样的共享储存来进行IPC。这篇文件的重点将转向管线,它是联接须要通讯的进程之间的通道。管线拥有一个写端用于写入字节数据,还有一个读端用于根据先入先出的次序读入这种字节数据。而这种字节数据可能代表任何东西:数字、员工记录、数字影片等等。
管线有两种类型,命名管线和无名管线,都可以交互式的在命令行或程序中使用它们;相关的事例在下边展示。这篇文章也将介绍显存队列,虽然它们有些过时了,但它们不应当受这样的待遇。
在本系列的第一篇文章中的示例代码承认了在IPC中可能遭到竞争条件(不管是基于文件的还是基于显存的)的恐吓。自然地我们也会考虑基于管线的IPC的安全并发问题,这个也将在本文中提到。针对管线和显存队列的事例将会使用POSIX推荐使用的API,POSIX的一个核心目标就是线程安全。
请查看一些mq_open函数的man页,这个函数属于显存队列的API。这个man页中有关特点的章节带有一个小表格:
插口特点值
mq_open()
线程安全
MT-Safe
里面的MT-Safe(MT指的是多线程multi-threaded)意味着mq_open函数是线程安全的,从而暗示是进程安全的:一个进程的执行和它的一个线程执行的过程类似,如果竞争条件不会发生在处于相同进程的线程中,这么这样的条件也不会发生在处于不同进程的线程中。MT-Safe特点保证了调用mq_open时不会出现竞争条件。通常来说,基于通道的IPC是并发安全的,虽然在下边事例中会出现一个有关警告的注意事项。
无名管线
首先让我们通过一个特意构造的命令行事例来展示无名管线是怎样工作的。在所有的现代系统中,符号|在命令行中都代表一个无名管路。假定我们的命令行提示符为%,接出来考虑下边的命令:
## 写入方在 | 左边,读取方在右边
% sleep 5 | echo "Hello, world!"
sleep和echo程序以不同的进程执行,无名管线容许它们进行通讯。并且里面的事例被特意设计为没有通讯发生。祝福语“Hello,world!”出现在屏幕中,之后过了5秒后,命令行返回linux waitpid 头文件,暗示sleep和echo进程都早已结束了。这期间发生了哪些呢?
在命令行中的竖线|的句型中,左侧的进程(sleep)是写入方,右侧的进程(echo)为读取方。默认情况下,读取方将会阻塞,直至从通道中才能读取到字节数据,而写入方在写完它的字节数据后,将发送流已中止end-of-stream的标志。(虽然写入方过早中止了,一个流已中止的标志还是会发给读取方。)无名管线将保持到写入方和读取方都停止的那种时刻。
在前面的反例中,sleep进程并没有向通道写入任何的字节数据,但在5秒后就中止了,这时将向通道发送一个流已中止的标志。与此同时,echo进程立刻向标准输出(屏幕)写入祝福语,由于这个进程并不从通道中读入任何字节,所以它并没有等待。一旦sleep和echo进程都中止了,不会再用作通讯的无名管线将会消失之后返回命令行提示符。
下边这个愈加实用的示例将使用两个无名管线。我们假设文件test.dat的内容如下:
this
is
the
way
the
world
ends
下边的命令:
% cat test.dat | sort | uniq
会将cat(联接concatenate的简写)进程的输出通过管线传给sort进程以生成排序后的输出,之后将排序后的输出通过管线传给uniq进程以去除重复的记录(在本例中,会将两次出现的“the”缩减为一个):
ends
is
the
this
way
world
下边展示的情境展示的是一个带有两个进程的程序通过一个无名管线通讯来进行通讯。
示例1.两个进程通过一个无名管路来进行通讯
#include <sys/wait.h> /* wait */
#include <stdio.h>
#include <stdlib.h> /* exit functions */
#include <unistd.h> /* read, write, pipe, _exit */
#include <string.h>
#define ReadEnd 0
#define WriteEnd 1
void report_and_exit(const char* msg) {
[perror][6](msg);
[exit][7](-1); /** failure **/
}
int main() {
int pipeFDs[2]; /* two file descriptors */
char buf; /* 1-byte buffer */
const char* msg = "Nature's first green is goldn"; /* bytes to write */
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
pid_t cpid = fork(); /* fork a child process */
if (cpid < 0) report_and_exit("fork"); /* check for failure */
if (0 == cpid) { /*** child ***/ /* child process */
close(pipeFDs[WriteEnd]); /* child reads, doesn't write */
while (read(pipeFDs[ReadEnd], &buf, 1) > 0) /* read until end of byte stream */
write(STDOUT_FILENO, &buf, sizeof(buf)); /* echo to the standard output */
close(pipeFDs[ReadEnd]); /* close the ReadEnd: all done */
_exit(0); /* exit and notify parent at once */
}
else { /*** parent ***/
close(pipeFDs[ReadEnd]); /* parent writes, doesn't read */
write(pipeFDs[WriteEnd], msg, [strlen][8](msg)); /* write the bytes to the pipe */
close(pipeFDs[WriteEnd]); /* done writing: generate eof */
wait(NULL); /* wait for child to exit */
[exit][7](0); /* exit normally */
}
return 0;
}
里面名为pipeUN的程序使用系统函数fork来创建一个进程。虽然这个程序只有一个单一的源文件,在它正确执行的情况下将会发生多进程的情况。
下边的内容是对库函数fork怎样工作的一个简略回顾:
◈fork函数由父进程调用,在失败时返回-1给父进程。在pipeUN这个事例中,相应的调用是:
pid_t cpid = fork(); /* called in parent */
函数调用后的返回值也被保存出来了。在这个事例中,保存在整数类型pid_t的变量cpid中。(每位进程有它自己的进程ID,这是一个非负的整数,拿来标记进程)。复刻一个新的进程可能会由于多种缘由而失败,包括进程表满了的缘由,这个结构由系统维持,借此来追踪进程状态。明晰地说,僵尸进程如果没有被处理掉,将可能导致进程表被塞满的错误。
◈如果fork调用成功,则它将创建一个新的子进程,向父进程返回一个值,向子进程返回另外的一个值。在调用fork后父进程和子进程都将执行相同的代码。(子进程承继了到此为止父进程中申明的所有变量的拷贝),非常地,一次成功的fork调用将返回如下的东西:
◈向子进程返回0
◈向父进程返回子进程的进程ID
◈在一次成功的fork调用后,一个if/else或等价的结构将会被拿来隔离针对父进程和子进程的代码。在这个事例中,相应的申明为:
if (0 == cpid) { /*** child ***/
...
}
else { /*** parent ***/
...
}
如果成功地复刻出了一个子进程,pipeUN程序将像下边这样去执行。在一个整数的数列里:
int pipeFDs[2]; /* two file descriptors */
来保存两个文件描述符,一个拿来向管路中写入,另一个从管线中写入。(链表元素pipeFDs[0]是读端的文件描述符,元素pipeFDs[1]是写端的文件描述符。)在调用fork之前,对系统pipe函数的成功调用,将立即促使这个字段获得两个文件描述符:
if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
父进程和子进程如今都有了文件描述符的副本。但分离关注点模式意味着每位进程正好只须要一个描述符。在这个事例中,父进程负责写入,而子进程负责读取,虽然这样的角色分配可以反过来。在if谓词中的第一个句子将用于关掉管线的读端:
close(pipeFDs[WriteEnd]); /* called in child code */
在父进程中的else谓词将会关掉管线的读端:
close(pipeFDs[ReadEnd]); /* called in parent code */
之后父进程将向无名管线中写入个别字节数据(ASCII代码),子进程读取这种数据,之后向标准输出中回放它们。
在这个程序中还须要澄清的一点是在父进程代码中的wait函数。一旦被创建后,子进程很大程度上独立于它的父进程,正如简略的pipeUN程序所展示的那样。子进程可以执行任意的代码,而它们可能与父进程完全没有关系。并且,如果当子进程中止时,系统将会通过一个讯号来通知父进程。
要是父进程在子进程之前中止又该怎么呢?在这些情形下,除非采取了防治举措,子进程将会弄成在进程表中的一个僵尸进程。防治举措有两大类型:第一种是让父进程去通知系统,告诉系统它对子进程的中止没有任何兴趣:
signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */
第二种方式是在子进程中止时,让父进程执行一个wait。这样就确保了父进程可以独立于子进程而存在。在pipeUN程序中使用了第二种方式,其中父进程的代码使用的是下边的调用:
wait(NULL); /* called in parent */
这个对wait的调用意味着仍然等待直至任意一个子进程的中止发生,因而在pipeUN程序中,只有一个子进程。(其中的NULL参数可以被替换为一个保存有子程序退出状态的整数变量的地址。)对于更细细度的控制,还可以使用更灵活的waitpid函数,比如非常指定多个子进程中的某一个。
pipeUN将会采取另一个防治举措。当父进程结束了等待,父进程将会调用常规的exit函数去退出。对应的,子进程将会调用_exit变种来退出,这类变种将快速跟踪中止相关的通知。在疗效上,子进程会告诉系统立即去通知父进程它的这个子进程早已中止了。
如果两个进程向相同的无名管线中写入内容,字节数据会交错吗?比如,如果进程P1向管线写入内容:
foo bar
同时进程P2并发地写入:
baz baz
到相同的管线,最后的结果显然是管线中的内容将会是任意错乱的,比如像这样:
baz foo baz bar
只要没有写入超过PIPE_BUF字节,POSIX标准才能确保写入不会交错。在Linux系统中,PIPE_BUF的大小是4096字节。对于管线我更喜欢只有一个写入方和一个读取方,因而绕开这个问题。
命名管线
无名管线没有备份文件:系统将维持一个显存缓存来将字节数据从写方传给读方。一旦写方和读方中止,这个缓存将会被回收,因而无名管线消失。相反的,命名管线有备份文件和一个不同的API。
下边让我们通过另一个命令行示例来了解命名管线的要点。下边是具体的步骤:
◈开启两个终端。这两个终端的工作目录应当相同。
◈在其中一个终端中,键入下边的两个命令(命令行提示符始终是%,我的注释以##打头。):
% mkfifo tester ## 创建一个备份文件,名为 tester
% cat tester ## 将管道的内容输出到 stdout
在最开始,没有任何东西会出现在终端中,由于到现今为止没有在命名管路中写入任何东西。
◈在第二个终端中输入下边的命令:
% cat > tester ## redirect keyboard input to the pipe
hello, world! ## then hit Return key
bye, bye ## ditto
<Control-C> ## terminate session with a Control-C
无论在这个终端中输入哪些,它就会在另一个终端中显示下来。一旦键入Ctrl+C,才会回到正常的命令行提示符,由于管线早已被关掉了。
◈通过移除实现命名管线的文件来进行清除:
% unlink tester
正如mkfifo程序的名子所暗示的那样,命名管线也被称作FIFO,由于第一个步入的字节,都会第一个出linux waitpid 头文件,其他的类似。有一个名为mkfifo的库函数,用它可以在程序中创建一个命名管线,它将在下一个示例中被用到,该示例由两个进程组成:一个向命名管线写入,而另一个从该管线读取。
示例2.fifoWriter程序
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>
#define MaxLoops 12000 /* outer loop */
#define ChunkSize 16 /* how many written at a time */
#define IntsPerChunk 4 /* four 4-byte ints per chunk */
#define MaxZs 250 /* max microseconds to sleep */
int main() {
const char* pipeName = "./fifoChannel";
mkfifo(pipeName, 0666); /* read/write for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */
if (fd < 0) return -1; /** error **/
int i;
for (i = 0; i < MaxLoops; i++) { /* write MaxWrites times */
int j;
for (j = 0; j < ChunkSize; j++) { /* each time, write ChunkSize bytes */
int k;
int chunk[IntsPerChunk];
for (k = 0; k < IntsPerChunk; k++)
chunk[k] = [rand][9]();
write(fd, chunk, sizeof(chunk));
}
usleep(([rand][9]() % MaxZs) + 1); /* pause a bit for realism */
}
close(fd); /* close pipe: generates an end-of-file */
unlink(pipeName); /* unlink from the implementing file */
[printf][10]("%i ints sent to the pipe.n", MaxLoops * ChunkSize * IntsPerChunk);
return 0;
}
里面的fifoWriter程序可以被总结为如下:
◈首先程序创建了一个命名管线拿来写入数据:
mkfifo(pipeName, 0666); /* read/write perms for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY);
其中的pipeName是备份文件的名子,传递给mkfifo作为它的第一个参数。接着命名管线通过我们熟悉的open函数调用被打开,而这个函数将会返回一个文件描述符。
◈在实现层面上,fifoWriter不会一次性将所有的数据都写入,而是写入一个块,之后休息随机数量的毫秒时间,接着再循环往复。总的来说,有768000个4字节整数值被写入到命名管线中。
◈在关掉命名管线后,fifoWriter也将使用unlink取消对该文件的联接。
close(fd); /* close pipe: generates end-of-stream marker */
unlink(pipeName); /* unlink from the implementing file */
一旦联接到管线的每位进程都执行了unlink操作后,系统将回收那些备份文件。在这个事例中,只有两个这样的进程fifoWriter和fifoReader,它们都做了unlink操作。
这个两个程序应当在不同终端的相同工作目录中执行。并且fifoWriter应当在fifoReader之前被启动,由于须要fifoWriter去创建管路。之后fifoReader才才能获取到刚被创建的命名管路。
示例3.fifoReader程序
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
unsigned is_prime(unsigned n) { /* not pretty, but gets the job done efficiently */
if (n <= 3) return n > 1;
if (0 == (n % 2) || 0 == (n % 3)) return 0;
unsigned i;
for (i = 5; (i * i) <= n; i += 6)
if (0 == (n % i) || 0 == (n % (i + 2))) return 0;
return 1; /* found a prime! */
}
int main() {
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);
if (fd < 0) return -1; /* no point in continuing */
unsigned count = 0, total = 0, primes_count = 0;
while (1) {
int next;
int i;
ssize_t count = read(fd, &next, sizeof(int));
if (0 == count) break; /* end of stream */
else if (count == sizeof(int)) { /* read a 4-byte int value */
total++;
if (is_prime(next)) primes_count++;
}
}
close(fd); /* close pipe from read end */
unlink(file); /* unlink from the underlying file */
[printf][10]("Received ints: %u, primes: %un", total, primes_count);
return 0;
}
里面的fifoReader的内容可以总结为如下:
◈由于fifoWriter早已创建了命名管线,所以fifoReader只须要借助标准的open调用来通过备份文件来获取到管线中的内容:
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);
这个文件的是以只读打开的。
◈之后这个程序步入一个潜在的无限循环,在每次循环时,尝试读取4字节的块。read调用:
ssize_t count = read(fd, &next, sizeof(int));
返回0来暗示该流的结束。在这些情况下,fifoReader跳出循环,关掉命名管线,并在中止前unlink备份文件。
◈在读入4字节整数后,fifoReader检测这个数是否为素数。这个操作代表了一个生产级别的读取器可能在接收到的字节数据上执行的逻辑操作。在示例运行中,在接收到的768000个整数中有37682个素数。
重复运行示例,fifoReader将成功地读取fifoWriter写入的所有字节。这不是很让人惊叹的。这两个进程在相同的机器上执行,因而可以不用考虑网路相关的问题。命名管线是一个可信且高效的IPC机制,因此被广泛使用。
下边是这两个程序的输出,它们在不同的终端中启动,但处于相同的工作目录:
% ./fifoWriter
768000 ints sent to the pipe.
###
% ./fifoReader
Received ints: 768000, primes: 37682
消息队列
管线有着严格的先入先出行为:第一个被写入的字节将会第一个被读,第二个写入的字节将第二个被读,以这种推。消息队列可以作出相同的表现,但它又足够灵活linux之家,可以促使字节块可以不以先入先出的顺序来接收。
正如它的名子所提示的那样,消息队列是一系列的消息,每位消息包含两部份:
◈载荷,一个字节序列(在C中是char)
◈类型,以一个正整数值的方式给定,类型拿来分类消息,为了更灵活的回收
看一下下边对一个消息队列的描述,每位消息由一个整数类型标记:
+-+ +-+ +-+ +-+
sender--->|3|--->|2|--->|2|--->|1|--->receiver
+-+ +-+ +-+ +-+
在里面展示的4个消息中,标记为1的是开头,即最接近接收端,之后另个标记为2的消息,最后接着一个标记为3的消息。如果依照严格的FIFO行为执行,消息将会以1-2-2-3这样的顺序被接收。并且消息队列容许其他缴纳顺序。诸如,消息可以被接收方以3-2-1-2的顺序接收。
mqueue示例包含两个程序,sender将向消息队列中写入数据,而receiver将从这个队列中读取数据。这两个程序都包含的头文件queue.h如下所示:
示例4.头文件queue.h
#define ProjectId 123
#define PathName "queue.h" /* any existing, accessible file would do */
#define MsgLen 4
#define MsgCount 6
typedef struct {
long type; /* must be of type long */
char payload[MsgLen + 1]; /* bytes in the message */
} queuedMessage;
里面的头文件定义了一个名为queuedMessage的结构类型,它带有payload(字节链表)和type(整数)这两个域。该文件也定义了一些符号常数(使用#define句子),前两个常数被拿来生成一个key,而这个key反过来被拿来获取一个消息队列的ID。ProjectId可以是任何正整数值,而PathName必须是一个存在的、可访问的文件,在这个示例中,指的是文件queue.h。在sender和receiver中,它们都有的设定句子为:
key_t key = ftok(PathName, ProjectId); /* generate key */
int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */
IDqid在疗效上是消息队列文件描述符的对应物。
示例5.sender程序
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
#include "queue.h"
void report_and_exit(const char* msg) {
[perror][6](msg);
[exit][7](-1); /* EXIT_FAILURE */
}
int main() {
key_t key = ftok(PathName, ProjectId);
if (key < 0) report_and_exit("couldn't get key...");
int qid = msgget(key, 0666 | IPC_CREAT);
if (qid < 0) report_and_exit("couldn't get queue id...");
char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};
int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */
int i;
for (i = 0; i < MsgCount; i++) {
/* build the message */
queuedMessage msg;
msg.type = types[i];
[strcpy][11](msg.payload, payloads[i]);
/* send the message */
msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */
[printf][10]("%s sent as type %in", msg.payload, (int) msg.type);
}
return 0;
}
里面的sender程序将发送出6个消息,每两个为一个类型:前两个是类型1,接着的连个是类型2linux系统安装教程,最后的两个为类型3。发送的句子:
msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);
被配置为非阻塞的(IPC_NOWAIT标志),是由于这儿的消息体量上都很小。惟一的危险在于一个完整的序列将可能造成发送失败,而这个反例不会。下边的receiver程序也将使用IPC_NOWAIT标志来接收消息。
示例6.receiver程序
#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include "queue.h"
void report_and_exit(const char* msg) {
[perror][6](msg);
[exit][7](-1); /* EXIT_FAILURE */
}
int main() {
key_t key= ftok(PathName, ProjectId); /* key to identify the queue */
if (key < 0) report_and_exit("key not gotten...");
int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */
if (qid < 0) report_and_exit("no access to queue...");
int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */
int i;
for (i = 0; i < MsgCount; i++) {
queuedMessage msg; /* defined in queue.h */
if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0)
[puts][12]("msgrcv trouble...");
[printf][10]("%s received as type %in", msg.payload, (int) msg.type);
}
/** remove the queue **/
if (msgctl(qid, IPC_RMID, NULL) < 0) /* NULL = 'no flags' */
report_and_exit("trouble removing queue...");
return 0;
}
这个receiver程序不会创建消息队列,虽然API虽然建议那样。在receiver中,对
int qid = msgget(key, 0666 | IPC_CREAT);
的调用可能由于带有IPC_CREAT标志而具有欺骗性,而且这个标志的真实意义是假如须要就创建,否则直接获取。sender程序调用msgsnd来发送消息,而receiver调用msgrcv来接收它们。在这个事例中,sender以1-1-2-2-3-3的顺序发送消息,但receiver接收它们的顺序为3-1-2-1-3-2,这显示消息队列没有被严格的FIFO行为所屈从:
% ./sender
msg1 sent as type 1
msg2 sent as type 1
msg3 sent as type 2
msg4 sent as type 2
msg5 sent as type 3
msg6 sent as type 3
% ./receiver
msg5 received as type 3
msg1 received as type 1
msg3 received as type 2
msg2 received as type 1
msg6 received as type 3
msg4 received as type 2
里面的输出显示sender和receiver可以在同一个终端中启动。输出也显示消息队列是持久的,尽管sender进程在完成创建队列、向队列写数据、然后退出的整个过程后,该队列依旧存在。只有在receiver进程显式地调用msgctl来移除该队列,这个队列就会消失:
if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */
总结
管线和消息队列的API在根本上来说都是双向的:一个进程写,之后另一个进程读。其实还存在单向命名管线的实现,但我觉得这个IPC机制在它最为简单的时侯反倒是最佳的。正如上面提及的那样,消息队列早已不大受欢迎了,虽然没有找到哪些非常好的诱因来解释这个现象;而队列一直是IPC工具箱中的一个工具。这个快速的IPC工具箱之旅将以第3部份(通过套接字和讯号来示例IPC)来终结。
via:
作者:MartyKalin选题:lujun9972译者:FSSlc校对:wxy
本文由LCTT原创编译,Linux中国荣誉推出