进程间通信(IPC) 系列 - 管道(pipe)

今天从源码的角度分析下进程间通信之管道(pipe)。


什么是管道 ?


所谓管道,是指用于连接一个读进程和一个写进程,以实现它们之间通信的共享文件,又称 pipe 文件。

向管道(共享文件)提供输入的发送进程(即写进程),以字符流形式将大量的数据送入管道;而接收管道输出的接收进程(即读进程),可从管道中接收数据。由于发送进程和接收进程是利用管道进行通信的,故又称管道通信。

为了协调双方的通信,管道通信机制必须提供以下3 方面的协调能力。

管道的应用


管道是利用 pipe() 系统调用而不是利用 open() 系统调用建立的。pipe() 调用的原型是:

int pipe(int fd[2])

我们看到,有两个文件描述符与管道结合在一起,一个文件描述符用于管道的read() 端,一个文件描述符用于管道的 write() 端。

由于一个函数调用不能返回两个值,pipe() 的参数是指向两个元素的整型数组的指针,它将由调用两个所要求的文件描述符填入。

fd[0] 元素将含有管道 read() 端的文件描述符,而 fd[1] 含有管道 write() 端的文件描述符。系统可根据 fd[0]fd[1] 分别找到对应的 file 结构。

注意,在 pipe 的参数中,没有路径名,这表明,创建管道并不像创建文件一样,要为它创建一个目录连接。这样做的好处是,其他现存的进程无法得到该管道的文件描述符,从而不能访问它。

那么,两个进程如何使用一个管道来通信呢?

我们知道,fork()exec() 系统调用可以保证文件描述符的复制品既可供双亲进程使用,也可供它的子女进程使用。也就是说,一个进程用 pipe() 系统调用创建管道,然后用 fork() 调用创建一个或多个进程,那么,管道的文件描述符将可供所有这些进程使用。

这里更明确的含义是:一个普通的管道仅可供具有共同祖先的两个进程之间共享,并且这个祖先必须已经建立了供它们使用的管道

注意,在管道中的数据始终以和写数据相同的次序来进行读,这表示 lseek()系统调用对管道不起作用。

下面给出在两个进程之间设置和使用管道的简单程序:


#include 
#include 
#include 

int main(void)
{
int fd[2], nbytes;
pid_t childpid;
char string[] = "Hello, world!
";
char readbuffer[80];

pipe(fd);

if((childpid = fork()) == -1)
{
printf("Error:fork");
exit(1);
}

if(childpid == 0) /* 子进程是管道的读进程 */
{
close(fd[1]); /*关闭管道的写端 */
nbytes = read(fd[0], readbuffer, sizeof(readbuffer));
printf("Received string: %s", readbuffer);
exit(0);
}
else /* 父进程是管道的写进程 */
{
close(fd[0]); /*关闭管道的读端 */
write(fd[1], string, strlen(string)); 
}

return(0);
}

注意,在这个例子中,为什么这两个进程都关闭它所不需的管道端呢?

这是因为写进程完全关闭管道端时,文件结束的条件被正确地传递给读进程。而读进程完全关闭管道端时,写进程无需等待继续写数据。

阻塞读和写分别成为对空和满管道的默认操作,这些默认操作也可以改变,这就需要调用 fcntl() 系统调用,对管道文件描述符设置 O_NONBLOCK 标志可以忽略默认操作:

#include 

fcntl(fd,F_SETFL,O_NONBlOCK);


上述例子如下图:


管道的源码分析


pipefs 初始化


pipefs 是一种简单的、虚拟的文件系统类型,因为它没有对应的物理设备,因此其安装时不需要块设备。大部分文件系统是以模块的形式来实现的。该文件系统相关的代码在 fs/pipe.c 中:


static struct file_system_type pipe_fs_type = {
.name = "pipefs",
.get_sb = pipefs_get_sb,
.kill_sb = kill_anon_super,
};


static int __init init_pipe_fs(void)
{
/*
pipe_fs_type 链接到file_systems 链表可以通过读/proc/filesystems 找到 
“pipefs”入口点,在那里,“nodev”标志表示没有设置
FS_REQUIRES_DEV 标志,即该文件系统没有对应的物理设备。
*/
int err = register_filesystem(&pipe_fs_type);
if (!err) {
//安装pipefs 文件系统
pipe_mnt = kern_mount(&pipe_fs_type);
if (IS_ERR(pipe_mnt)) {
err = PTR_ERR(pipe_mnt);
unregister_filesystem(&pipe_fs_type);
}
}

return err;
}


//pipefs 文件系统是作为一个模块来安装的
fs_initcall(init_pipe_fs);
module_exit(exit_pipe_fs); //模块卸载函数

上述就是初始化时注册 pipefs 文件系统的过程,操作如下:


pipe 的创建


pipefs 文件系统的入口点就是pipe()系统调用,其内核实现函数为sys_pipe(),而真正的工作是调用 do_pipe() 函数来完成的,其代码在 fs/pipe.c 中:


int do_pipe(int *fd)
{
    struct file *fw, *fr;
    int fdw, fdr;

    //创建管道写端的file结构
    fw = create_write_pipe();   

    //在写端的file结构基础上构建读端
    fr = create_read_pipe(fw);

    //创建读端fd
    fdr = get_unused_fd();

    //创建写端fd
    fdw = get_unused_fd();

    //fd 和 file进行关联
    fd_install(fdr, fr);
    fd_install(fdw, fw);

    //返回读写端fd
    fd[0] = fdr;
    fd[1] = fdw;

    ...

    return 0;
}


struct file *create_write_pipe(void)
{
    ...

    struct qstr name = { .name = "" };

    //创建 file 结构
    f = get_empty_filp();

    //创建一个pipe相关的 inode
    inode = get_pipe_inode();

    //创建一个dentry结构
    dentry = d_alloc(pipe_mnt->mnt_sb->s_root, &name);

    //inode 和 dentry 相关联
    d_instantiate(dentry, inode);

    // pipe 和 pipe_mnt 关联
    f->f_path.mnt = mntget(pipe_mnt);

    //file 与 dentry 相关联
    f->f_path.dentry = dentry;

    //该file是只写的
    f->f_flags = O_WRONLY;

    //该pipe的可操作方法
    f->f_op = &write_pipe_fops;

    ...

    return f;
}



struct file *create_read_pipe(struct file *wrf)
{
    //创建一个file结构用于读
    struct file *f = get_empty_filp();

    //file 与 已有的dentry、inode、struct vfsmount 相关联
    f->f_path.mnt = mntget(wrf->f_path.mnt);
    f->f_path.dentry = dget(wrf->f_path.dentry);
    f->f_mapping = wrf->f_path.dentry->d_inode->i_mapping;

    //该file是只读的
    f->f_flags = O_RDONLY;
    f->f_op = &read_pipe_fops;

    f->f_mode = FMODE_READ;
    f->f_version = 0;

    return f;
}


do_pipe() 的操作也很简单,操作如下:


上述 pipe 的初始化中,创建的 pipe_inode_info 对象记录了 pipe 读写过程中所有的得数据。pipe_inode_info 对象的结构如下

struct pipe_inode_info {
wait_queue_head_t wait;//存储等待读写进程的等待队列
/* nrbufs: 写入但还未被读取的数据占用缓冲区的页数
curbuf:当前正在读取环形缓冲区中的页节点
*/
unsigned int nrbufs, curbuf;
struct page *tmp_page; //临时缓冲区页面
unsigned int readers; //正在读取pipe的读进程数目
unsigned int writers; //正在写pipe的写进程数目
unsigned int waiting_writers; //等待管道可以写的进程数目
...
struct inode *inode; //pipe 对应的inode结构
struct pipe_buffer bufs[PIPE_BUFFERS]; //环形缓冲区,每个元素对应一个内存页
};

经过上述的一系列初始化,整个管道的内存结构如下图所示



pipe_read


static ssize_t pipe_read(struct kiocb *iocb, const struct iovec *_iov, unsigned long nr_segs, loff_t pos)
{
...

//要读的数据长度
total_len = iov_length(iov, nr_segs);

do_wakeup = 0;
ret = 0;
//读之前先加锁
mutex_lock(&inode->i_mutex);

//循环读数据
for (;;) {
int bufs = pipe->nrbufs;
if (bufs) {
int curbuf = pipe->curbuf;
struct pipe_buffer *buf = pipe->bufs + curbuf;
size_t chars = buf->len;

//待读的数据比要读的数据多,则设置要读的长度
if (chars > total_len)
chars = total_len;

error = ops->confirm(pipe, buf);
if (error) {
if (!ret)
error = ret;
break;
}

atomic = !iov_fault_in_pages_write(iov, chars);
redo: 
//把数据拷贝到用户缓冲区
addr = ops->map(pipe, buf, atomic);
error = pipe_iov_copy_to_user(iov, addr + buf->offset, chars, atomic);
ops->unmap(pipe, buf, addr);
if (unlikely(error)) {
...
}

ret += chars;
buf->offset += chars;
buf->len -= chars;
//该页的数据全部读完,释放该page
if (!buf->len) {
buf->ops = NULL;
ops->release(pipe, buf);
curbuf = (curbuf + 1) & (PIPE_BUFFERS-1);
pipe->curbuf = curbuf;
pipe->nrbufs = --bufs;
do_wakeup = 1;
}
total_len -= chars;
if (!total_len)
break; /* common path: read succeeded */
} //if (bufs)
//若该页没读完,继续循环读,若该page读完,则读下一个page
if (bufs) /* More to do? */
continue;

if (!pipe->writers)
break;

if (!pipe->waiting_writers) {
if (ret)
break;
//非阻塞跳出循环,不进行休眠
if (filp->f_flags & O_NONBLOCK) {
ret = -EAGAIN;
break;
}
}

if (signal_pending(current)) {
if (!ret)
ret = -ERESTARTSYS;
break;
}
//唤醒写进程
if (do_wakeup) {
wake_up_interruptible_sync(&pipe->wait);
kill_fasync(&pipe->fasync_writers, SIGIO,POLL_OUT);
}

//让出cpu,进行休眠,等待条件唤醒
pipe_wait(pipe);

} // for

mutex_unlock(&inode->i_mutex);

if (do_wakeup) {
wake_up_interruptible_sync(&pipe->wait);
kill_fasync(&pipe->fasync_writers, SIGIO, POLL_OUT);
}
if (ret > 0)
file_accessed(filp);

return ret;
}


读的操作很简单,操作如下:

读取数据的过程如下



pipe_write


static ssize_t pipe_write(struct kiocb *iocb, const struct iovec *_iov, unsigned long nr_segs, loff_t ppos)
{
...

//需要写入数据的长度
total_len = iov_length(iov, nr_segs); 
do_wakeup = 0;

mutex_lock(&inode->i_mutex);

//管道读端已关闭,返回SIGPIPE信号
if (!pipe->readers) {
send_sig(SIGPIPE, current, 0);
ret = -EPIPE;
goto out;
}

/* We try to merge small writes 
curbuf:当前的pipe缓冲节点
nrbufs:非空的pipe缓冲节点数目
buffers:buf缓冲区总数目
buf->offset:页内可用数据的偏移量
buf->len :可用数据的长度
buf->offset + buf->len :页内可以往有效数据后追加数据的下标

*/
//获取完整页之外的数量
chars = total_len & (PAGE_SIZE-1); /* size of the last buffer */
if (pipe->nrbufs && chars != 0) {
//获取下一个可用的缓冲区,比如缓冲区是0~5, 有效数据起始buff是3,有效数据是4,那么存储buff数据依次为3,4,5,0,下一个可用的buff为1 ((3+4-1)/5)
int lastbuf = (pipe->curbuf + pipe->nrbufs - 1) & (PIPE_BUFFERS-1);
//获取下一个可用的buff,lastbuf为获取到的索引
struct pipe_buffer *buf = pipe->bufs + lastbuf; 
const struct pipe_buf_operations *ops = buf->ops;
/*offset: page中数据的偏移量。len:page中数据的长度
目前数据在page中的有效起始地址 + 有效数据长度 = 下一个可存放数据的地址。
管道是从前往后读的,并没规定读写大小,有可能只读取了page的前一部分,中间部分尚未读取,但是写的时候必须从中间有效数据后继续写
*/
int offset = buf->offset + buf->len;
//当前需要写入的数据 + 已有的数据若没有超过PAGE_SIZE大小,则拷贝到page中
if (ops->can_merge && offset + chars <= PAGE_SIZE) {
...
redo1:
addr = ops->map(pipe, buf, atomic);
//将用户数据拷贝到page中
error = pipe_iov_copy_from_user(offset + addr, iov, chars, atomic);
ops->unmap(pipe, buf, addr);
ret = error;
do_wakeup = 1;
if (error) {
...
}
//更新有效数据
buf->len += chars;
total_len -= chars;
ret = chars;
//全拷贝完则跳出
if (!total_len)
goto out;
}
}

for (;;) {
int bufs;

if (!pipe->readers) {
...
}
/*获取管道还有多少有效的buffer缓冲区*/
bufs = pipe->nrbufs;
if (bufs < PIPE_BUFFERS) { //有效的bufs小于缓冲区总数
int newbuf = (pipe->curbuf + bufs) & (PIPE_BUFFERS-1); //获取下一个可用的buf
struct pipe_buffer *buf = pipe->bufs + newbuf;
struct page *page = pipe->tmp_page;

if (!page) {
page = alloc_page(GFP_HIGHUSER);
if (unlikely(!page)) {
...
}
pipe->tmp_page = page;
}

do_wakeup = 1;
chars = PAGE_SIZE;
if (chars > total_len)
chars = total_len;

iov_fault_in_pages_read(iov, chars);
redo2:

error = pipe_iov_copy_from_user(src, iov, chars, atomic);

ret += chars;
//更新有效数据 
buf->page = page;
buf->ops = &anon_pipe_buf_ops;
buf->offset = 0;
buf->len = chars;
pipe->nrbufs = ++bufs;
pipe->tmp_page = NULL;

total_len -= chars;
//数据写完,跳出循环结束
if (!total_len)
break;
}
//还有可用的缓冲区,继续写
if (bufs < PIPE_BUFFERS)
continue;
//缓冲区全部写完了,则判断是否需要阻塞休眠
if (filp->f_flags & O_NONBLOCK) {
if (!ret)
ret = -EAGAIN;
break;
}

if (signal_pending(current)) {
if (!ret)
ret = -ERESTARTSYS;
break;
}
//唤醒读进程
if (do_wakeup) {
wake_up_interruptible_sync(&pipe->wait);
kill_fasync(&pipe->fasync_readers, SIGIO, POLL_IN);
do_wakeup = 0;
}
//写进程休眠
pipe->waiting_writers++;
pipe_wait(pipe);//将当前任务加入到等待列表,释放锁,让出CPU
pipe->waiting_writers--;
}
out:
...

return ret;
}

管道写操作流程如下:

往管道中写数据的过程如下



管道中最重要的2个方法就是管道的读写。从上述的分析来看,读写进程共同操作内核中的数据缓冲区,若有缓冲区可写,则进程往缓冲区中写,若条件不允许写,则进程休眠让出 CPU。读操作同理。


从上述管道读写操作可知,父子进程之所以能够通过 pipe 进行通信,是因为在内核中共同指向了同一个pipe_inode_info 对象,共同操作同一个内存页。


总结


展开阅读全文

页面更新:2024-03-01

标签:管道   进程   通信   缓冲区   文件系统   结构   操作   文件   系列   数据   系统

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top