Linux内核学习笔记《剖析RCU原理机制》

一、什么是RCU机制

RCU机制是Linux2.6之后提供的一种数据一致性访问的机制,从RCU(read-copy-update)的名称上看,我们就能对他的实现机制有一个大概的了解,在修改数据的时候,首先需要读取数据,然后生成一个副本,对副本进行修改,修改完成之后再将老数据update成新的数据,此所谓RCU。

RCU 是Linux中比较重要的一种同步机制。顾名思义就是== " 读,拷贝更新" ==,该机制记录了指向共享数据结构的指针的所有使用者,在该结构将要改变的时候,首先创建一个副本,在副本中修改,在所有进行读访问的使用者结束对旧副本的读取之后,指针可以替换指向新的,修改后的副本的指针,这种机制可以允许并发读写。

在操作系统中,数据一致性访问是一个非常重要的部分,通常我们可以采用锁机制实现数据的一致性访问。例如,semaphore、spinlock机制,在访问共享数据时,首先访问锁资源,在获取锁资源的前提下才能实现数据的访问。这种原理很简单,根本的思想就是在访问临界资源时,首先访问一个全局的变量(锁),通过全局变量的状态来控制线程对临界资源的访问。但是,这种思想是需要硬件支持的,硬件需要配合实现全局变量(锁)的读-修改-写,现代CPU都会提供这样的原子化指令。采用锁机制实现数据访问的一致性存在如下两个问题:

效率问题。锁机制的实现需要对内存的原子化访问,这种访问操作会破坏流水线操作,降低了流水线效率。这是影响性能的一个因素。另外,在采用读写锁机制的情况下,写锁是排他锁,无法实现写锁与读锁的并发操作,在某些应用下会降低性能。

扩展性问题。当系统中CPU数量增多的时候,采用锁机制实现数据的同步访问效率偏低。并且随着CPU数量的增多,效率降低,由此可见锁机制实现的数据一致性访问扩展性差。

为了解决上述问题,Linux中引进了RCU机制。该机制在多CPU的平台上比较适用,对于读多写少的应用尤其适用。

1、历史背景 —— 原始的RCU思想

在多线程场景下,经常我们需要并发访问一个数据结构,为了保证线程安全我们会考虑使用互斥设施来进行同步,更进一步我们会根据对这个数据结构的读写比例而选用读写锁进行优化。但是读写锁不是唯一的方式,我们可以借助于COW技术来做到写操作不需要加锁,也就是在读的时候正常读,写的时候,先加锁拷贝一份,然后进行写,写完就原子的更新回去,使用COW实现避免了频繁加读写锁本身的性能开销。

优点

1、由于 RCU 旨在最小化读取端开销,因此仅在以更高速率使用同步逻辑进行读取操作时才使用它。如果更新操作超过10%,性能反而会变差,所以应该选择另一种同步方式而不是RCU。

2、几乎没有读取端开销。零等待,零开销没有死锁问题没有优先级倒置问题(优先级倒置和优先级继承)无限制延迟没有问题无内存泄漏风险问题。

缺点

1、使用起来有点复杂对于写操作,它比其他同步技术稍慢适用场景。

2、基础架构 —— RCU算法设计

Linux 内核RCU 参考QSBR算法设计一套无锁同步机制。

多个读者可以并发访问共享数据,而不需要加锁;

写者更新共享数据时候,需要先copy副本,在副本上修改,最终,读者只访问原始数据,因此他们可以安全地访问数据,多个写者之间是需要用锁互斥访问的(比如用自旋锁);

修改资源后,需要更新共享资源,让后面读者可以访问最新的数据;

等旧资源上所有的读者都访问完毕后,就可以回收旧资源了

3、实现思路—— 读写回收实现思路

RCU 的关键思想有两个:1)复制后更新;2)延迟回收内存。典型的RCU更新时序如下:


1、对于读操作,可以直接对共享资源进行访问,但是前提是需要CPU支持访存操作的原子化,现代CPU对这一点都做了保证。但是RCU的读操作上下文是不可抢占的(这一点在下面解释),所以读访问共享资源时可以采用read_rcu_lock(),该函数的工作是停止抢占。

2、对于写操作,其需要将原来的老数据作一次备份(copy),然后对备份数据进行修改,修改完毕之后再用新数据更新老数据,更新老数据时采用了rcu_assign_pointer()宏,在该函数中首先屏障一下memory,然后修改老数据。这个操作完成之后,需要进行老数据资源的回收。操作线程向系统注册回收方法,等待回收。采用数据备份的方法可以实现读者与写者之间的并发操作,但是不能解决多个写者之间的同步,所以当存在多个写者时,需要通过锁机制对其进行互斥,也就是在同一时刻只能存在一个写者。

3、在RCU机制中存在一个垃圾回收的daemon,当共享资源被update之后,可以采用该daemon实现老数据资源的回收。回收时间点就是在update之前的所有的读者全部退出。由此可见写者在update之后是需要睡眠等待的,需要等待读者完成操作,如果在这个时刻读者被抢占或者睡眠,那么很可能会导致系统死锁。因为此时写者在等待读者,读者被抢占或者睡眠,如果正在运行的线程需要访问读者和写者已经占用的资源,那么死锁的条件就很有可能形成了。

4、实现思路 —— 实例说明

RCU(Read-Copy Update)是数据同步的一种方式,在当前的Linux内核中发挥着重要的作用。RCU主要针对的数据对象是链表,目的是提高遍历读取数据的效率,为了达到目的使用RCU机制读取数据的时候不对链表进行耗时的加锁操作。这样在同一时间可以有多个线程同时读取该链表,并且允许一个线程对链表进行修改(修改的时候,需要加锁)。RCU适用于需要频繁的读取数据,而相应修改数据并不多的情景,例如在文件系统中,经常需要查找定位目录,而对目录的修改相对来说并不多,这就是RCU发挥作用的最佳场景。

Linux内核源码当中,关于RCU的文档比较齐全,你可以在 /Documentation/RCU/ 目录下找到这些文件。Paul E. McKenney 是内核中RCU源码的主要实现者,他也写了很多RCU方面的文章。他把这些文章和一些关于RCU的论文的链接整理到了一起。http://www2.rdrop.com/users/paulmck/RCU/

在RCU的实现过程中,我们主要解决以下问题:

在读取过程中,另外一个线程删除了一个节点。删除线程可以把这个节点从链表中移除,但它不能直接销毁这个节点,必须等到所有的读取线程读取完成以后,才进行销毁操作。RCU中把这个过程称为宽限期(Grace period)。

在读取过程中,另外一个线程插入了一个新节点,而读线程读到了这个节点,那么需要保证读到的这个节点是完整的。这里涉及到了发布-订阅机制(Publish-Subscribe Mechanism)。

保证读取链表的完整性。新增或者删除一个节点,不至于导致遍历一个链表从中间断开。但是RCU并不保证一定能读到新增的节点或者不读到要被删除的节点。

宽限期

struct rcu_st {

int a;

char b;

long c;

};

DEFINE_SPINLOCK(rcu_st_mutex);

struct rcu_st *gbl_rcu_st = NULL;

void rcu_st_read(void)

{

rcu_st *fp = gbl_rcu_st;

if ( fp != NULL ) {

dosomething(fp->a, fp->b , fp->c );

}

}

void rcu_st_update(rcu_st* new_fp)

{

spin_lock(&rcu_st_mutex);

rcu_st *old_fp = gbl_rcu_st;

gbl_rcu_st = new_fp;

spin_unlock(&rcu_st_mutex);

kfee(old_fp);

}

如上的程序,是针对于全局变量gbl_rcu_st的操作。假设以下场景。有两个线程同时运行 rcu_st_read和rcu_st_update的时候,当 rcu_st_read执行完赋值操作后,线程发生切换;此时另一个线程开始执行rcu_st_update并执行完成。当rcu_st_read运行的进程切换回来后,运行dosomething 的时候,fp已经被删除,这将对系统造成危害。为了防止此类事件的发生,RCU里增加了一个新的概念叫宽限期(Grace period)。如下图所示:

Removal:在写端临界区部分,读取(Read()),进行复制(Copy),并执行更改(Update)操作;

Grace Period:这是一个等待期,以确保所有与执行删除的数据相关的reader访问完毕;

Reclamation:回收旧数据;

图中每行代表一个线程,最下面的一行是删除线程,当它执行完删除操作后,线程进入了宽限期。

宽限期的意义是,在一个删除动作发生后,它必须等待所有在宽限期开始前已经开始的读线程结束,才可以进行销毁操作。这样做的原因是这些线程有可能读到了要删除的元素。图中的宽限期必须等待1和2结束;而读线程5在宽限期开始前已经结束,不需要考虑;而3,4,6也不需要考虑,因为在宽限期结束后开始后的线程不可能读到已删除的元素。为此RCU机制提供了相应的API来实现这个功能。

void rcu_st_read(void)

{

rcu_read_lock();

rcu_st *fp = gbl_rcu_st;

if ( fp != NULL ) {

dosomething(fp->a, fp->b , fp->c);

}

rcu_read_unlock();

}

void rcu_st_update(rcu_st* new_fp)

{

spin_lock(&rcu_st_mutex);

rcu_st *old_fp = gbl_rcu_st;

gbl_rcu_st = new_fp;

spin_unlock(&rcu_st_mutex);

synchronize_rcu();

kfee(old_fp);

}

其中rcu_st_read中增加了rcu_read_lock和rcu_read_unlock,这两个函数用来标记一个RCU读过程的开始和结束。其实作用就是帮助检测宽限期是否结束。rcu_st_update增加了一个函数synchronize_rcu(),调用该函数意味着一个宽限期的开始,而直到宽限期结束,该函数才会返回。我们再对比着图看一看,线程1和2,在synchronize_rcu之前可能得到了旧的gbl_rcu_st,也就是rcu_st_update中的old_fp,如果不等它们运行结束,就调用kfee(old_fp),极有可能造成系统崩溃。而3,4,6在synchronize_rcu之后运行,此时它们已经不可能得到old_fp,此次的kfee将不对它们产生影响。

宽限期是RCU实现中最复杂的部分,原因是在提高读数据性能的同时,删除数据的性能也不能太差。

订阅——发布机制

当前使用的编译器大多会对代码做一定程度的优化,CPU也会对执行指令做一些优化调整,目的是提高代码的执行效率,但这样的优化,有时候会带来不期望的结果。如例:

void rcu_st_read(void)

{

rcu_read_lock();

rcu_st *fp = gbl_rcu_st;

if ( fp != NULL ) {

dosomething(fp->a, fp->b , fp->c);

}

rcu_read_unlock();

}

void rcu_st_update(rcu_st* new_fp)

{

spin_lock(&rcu_st_mutex);

rcu_st *old_fp = gbl_rcu_st;

new_fp->a = 1; /* Line 3 */

new_fp->b = ‘b’; /* Line 4 */

new_fp->c = 100; /* Line 5 */

gbl_rcu_st = new_fp; /* Line 6 */

spin_unlock(&rcu_st_mutex);

synchronize_rcu();

kfee(old_fp);

}

这段代码中,我们期望的是3,4,5行的代码在第6行代码之前执行。但优化后的代码并不对执行顺序做出保证。在这种情形下,一个读线程很可能读到new_fp,但new_fp的成员赋值还没执行完成。当读线程执行dosomething(fp->a, fp->b , fp->c) 的 时候,就有不确定的参数传入到dosomething,极有可能造成不期望的结果,甚至程序崩溃。可以通过优化屏障来解决该问题,RCU机制对优化屏障做了包装,提供了专用的API来解决该问题。这时候,第6行不再是直接的指针赋值,而应该改为 :

rcu_assign_pointer(gbl_rcu_st,new_fp);

rcu_assign_pointer的实现比较简单,如下:

#include

#define rcu_assign_pointer(p, v)

__rcu_assign_pointer((p), (v), __rcu)


#define __rcu_assign_pointer(p, v, space)

do {

smp_wmb();

(p) = (typeof(*v) __force space *)(v);

} while (0)

我们可以看到它的实现只是在赋值之前加了优化屏障 smp_wmb来确保代码的执行顺序。另外就是宏中用到的__rcu,只是作为编译过程的检测条件来使用的。

或者改成:

3,4,5的new_fp a,b,c赋值操作,可能在rcu_st_read,rcu_st *old_fp = gbl_rcu_st;赋值前还没有完成赋值,当他和rcu_st_update同时运行的时候,可能导致传入dosomething的一部分属于旧的gbl_rcu_st,而另外的属于新的。这样导致运行结果的错误。为了避免该类问题,RCU还是提供了宏来解决该问题:

#include

#define rcu_dereference(p) rcu_dereference_check(p, 0)

#define rcu_dereference_check(p, c)

__rcu_dereference_check((p), rcu_read_lock_held() || (c), __rcu)

#define __rcu_dereference_check(p, c, space)

({

typeof(*p) *_________p1 = (typeof(*p)*__force )ACCESS_ONCE(p);

rcu_lockdep_assert(c, "suspicious rcu_dereference_check()"

" usage");

rcu_dereference_sparse(p, space);

smp_read_barrier_depends();

((typeof(*p) __force __kernel *)(_________p1));

})


static inline int rcu_read_lock_held(void)

{

if (!debug_lockdep_rcu_enabled())

return 1;

if (rcu_is_cpu_idle())

return 0;

if (!rcu_lockdep_current_cpu_online())

return 0;

return lock_is_held(&rcu_lock_map);

}

在赋值后加入优化屏障smp_read_barrier_depends()

在rcu_st_read我们之前的第四行代码改为 foo *fp = rcu_dereference(gbl_foo);,就可以防止上述问题。

void rcu_st_read(void)

{

rcu_read_lock();

rcu_st *fp = rcu_dereference(gbl_rcu_st);

if ( fp != NULL ) {

dosomething(fp->a, fp->b , fp->c);

}

rcu_read_unlock();

}

数据读取的完整性

还是通过例子来说明这个问题:

如图我们在原list中加入一个节点new到A之前,所要做的第一步是将new的指针指向A节点,第二步才是将Head的指针指向new。这样做的目的是当插入操作完成第一步的时候,对于链表的读取并不产生影响,而执行完第二步的时候,读线程如果读到new节点,也可以继续遍历链表。

如果把这个过程反过来,第一步head指向new,而这时一个线程读到new,由于new的指针指向的是Null,这样将导致读线程无法读取到A,B等后续节点。从以上过程中,可以看出RCU并不保证读线程读取到new节点。如果该节点对程序产生影响,那么就需要外部调用做相应的调整。如在文件系统中,通过RCU定位后,如果查找不到相应节点,就会进行其它形式的查找。

如图我们希望删除B,这时候要做的就是将A的指针指向C,保持B的指针,然后删除程序将进入宽限期检测。由于B的内容并没有变更,读到B的线程仍然可以继续读取B的后续节点。B不能立即销毁,它必须等待宽限期结束后,才能进行相应销毁操作。由于A的节点已经指向了C,当宽限期开始之后所有的后续读操作通过A找到的是C,而B已经隐藏了,后续的读线程都不会读到它。这样就确保宽限期过后,删除B并不对系统造成影响。

二、RCU核心API

如果指针ptr指向被RCU保护的数据结构,直接反引用指针是被禁止的,首先必须调用rcu_dereference(ptr),然后反引用返回的结果,需要使用rcu_read_lock和rcu_read_unlock 调用来进行保护。

rcu_read_lock()

rcu_read_unlock()

synchronize_rcu()/call_rcu()

rcu_assign_pointer()

rcu_dereference()

1、rcu_read_lock()

void rcu_read_lock(void);

读者读取受RCU保护的数据结构时使用,通知回收者读者进入了RCU的读端临界区。在RCU读端临界区访问的任何受RCU保护的数据结构都会保证在临界区期间保持未回收状态。另外,引用计数可以与RCU一起使用,以维护对数据结构的长期引用。在RCU读侧临界区阻塞是非法的。rcu_read_lock的实现非常简单,是关闭抢占:

static inline void __rcu_read_lock(void)

{

preempt_disable();

}

2、rcu_read_unlock()

void rcu_read_unlock(void);

读者结束读取后使用,用于通知回收者其退出了读端临界区。RCU的读端临界区可能被嵌套或重叠。rcu_read_unlock 的实现是开发抢占。

static inline void __rcu_read_unlock(void)

{

preempt_enable();

}

3、synchronize_rcu()

void synchronize_rcu(void);

synchronize_rcu 函数的关键思想是等待。确保读者完成对旧结构体的操作后释放旧结构体。synchronize_rcu 的调用点标志着“更新者代码的结束”和“回收者代码的开始”。它通过阻塞来做到这一点,直到所有cpu上所有预先存在的RCU读端临界区都完成。

需要注意的是,synchronize_rcu()只需要等待调用它之前的读端临界区完成,不需要等待调用它之后开始的读取者完成。另外,synchronize_rcu()不一定在最后一个预先存在的RCU读端临界区完成之后立即返回。具体实现中可能会有延时调度。同时,为了提高效率,许多RCU实现请求批量处理,这可能会进一步延迟 synchronize_rcu() 的返回。

4、call_rcu()

call_rcu() API是syncnize_rcu()的回调形式,它注册而不是阻塞,而是注册一个函数和自变量,这些函数和自变量在所有正在进行的RCU读取侧关键部分均已完成之后被调用。 在禁止非法访问或更新端性能要求比较高时,此回调变体特别有用。

但是,不应轻易使用call_rcu() API,因为对syncnize_rcu() API的使用通常会使代码更简单。 此外,synchronize_rcu() API具有不错的属性,可以在宽限期被延迟时自动限制更新速率。 面对拒绝服务攻击,此属性导致系统具有弹性。 使用call_rcu()的代码应限制更新速率,以获得相同的弹性。

在上面的例子中,rcu_st_update阻塞直到一个宽限期结束。这很简单,但在某些情况下,人们不能等这么久——可能还有其他高优先级的工作要做。 在这种情况下,使用call_rcu()而不是synchronize_rcu()。call_rcu() API如下:

void call_rcu(struct rcu_head * head, void (*func)(struct rcu_head *head));

此函数在宽限期过后调用func(heda)。此调用可能发生在softirq或进程上下文中,因此不允许阻止该函数。rcu_st结构需要添加一个rcu-head结构,可能如下所示:

struct foo {

int a;

char b;

long c;

struct rcu_head rcu;

};

foo_update_a()函数示例如下:

/*

* Create a new struct foo that is the same as the one currently

* * pointed to by gbl_foo, except that field "a" is replaced

* * with "new_a". Points gbl_foo to the new structure, and

* * frees up the old structure after a grace period. *

* Uses rcu_assign_pointer() to ensure that concurrent readers

* * see the initialized version of the new structure.

* * Uses call_rcu() to ensure that any readers that might have

* * references to the old structure complete before freeing the * old structure.

* */

void foo_update_a(int new_a) {

struct foo *new_fp = NULL;

struct foo *old_fp = NULL;


new_fp = kmalloc(sizeof(*new_fp), GFP_KERNEL);


spin_lock(&foo_mutex);

old_fp = rcu_dereference_protected(gbl_foo, lockdep_is_held(&foo_mutex));


*new_fp = *old_fp;

new_fp->a = new_a;

rcu_assign_pointer(gbl_foo, new_fp);


spin_unlock(&foo_mutex);

/* 挂接释放函数 */

call_rcu(&old_fp->rcu, foo_reclaim);

}

// The foo_reclaim() function might appear as follows:

void foo_reclaim(struct rcu_head *rp)

{

struct foo *fp = container_of(rp, struct foo, rcu);

foo_cleanup(fp->a);

kfree(fp);

}

container_of() 原语是一个宏,给定指向结构的指针,结构的类型以及结构内的指向字段,该宏将返回指向结构开头的指针。

使用 call_rcu() 可使 foo_update_a() 的调用方立即重新获得控制权,而不必担心新近更新的元素的旧版本。 它还清楚地显示了更新程序 foo_update_a()和回收程序 foo_reclaim() 之间的RCU区别。

在从受RCU保护的数据结构中删除数据元素之后,请使用call_rcu()-以注册一个回调函数,该函数将在所有可能引用该数据项的RCU读取侧完成后调用。如果call_rcu()的回调除了在结构上调用kfree()之外没有做其他事情,则可以使用kfree_rcu()代替call_rcu()来避免编写自己的回调:kfree_rcu(old_fp,rcu)

5、rcu_assign_pointer()

原型:

void rcu_assign_pointer(p, typeof(p) v);

rcu_assign_pointer()通过宏实现。将新指针赋给RCU结构体,赋值前的读者看到的还是旧的指针。

更新者使用这个函数为受rcu保护的指针分配一个新值,以便安全地将更新的值更改传递给读者。 此宏不计算rvalue,但它执行某CPU体系结构所需的内存屏障指令。保证内存屏障前的指令一定会先于内存屏障后的指令被执行。

它用于记录

(1)哪些指针受RCU保护以及

(2)给定结构可供其他CPU访问的点。 rcu_assign_pointer()最常通过_rcu列表操作原语(例如list_add_rcu())间接使用。

6、rcu_dereference()

原型:

typeof(p) rcu_dereference(p);

与rcu_assign_pointer()类似,rcu_dereference()也必须通过宏实现。

读者通过rcu_dereference()获取受保护的RCU指针,该指针返回一个可以安全解除引用的值。 请注意,rcu_dereference()实际上并未取消对指针的引用,相反,它保护指针供以后取消引用。 它还针对给定的CPU体系结构执行任何所需的内存屏障指令。

常见的编码实践是使用rcu_dereference() 将一个受rcu保护的指针复制到一个局部变量,然后解引用这个局部变量,例如:

p = rcu_dereference(head.next);

return p->data;

然而,上述情况可以整合成如下一句:

return rcu_dereference(head.next)->data;

7、内核常见使用实例

rcu_read_lock();

list_for_each_entry_rcu(pos, head, member) {

// do something with `pos`

}

rcu_read_unlock();

/* p 指向一块受 RCU 保护的共享数据 */

/* reader */

rcu_read_lock();

p1 = rcu_dereference(p);

if (p1 != NULL) {

printk("%d ", p1->field);

}

rcu_read_unlock();

/* free the memory */

p2 = p;

if (p2 != NULL) {

p = NULL;

synchronize_rcu();

kfree(p2);

}

展开阅读全文

页面更新:2024-03-29

标签:机制   临界   节点   线程   指针   内核   函数   原理   读者   操作   数据   宽限期

1 2 3 4 5

上滑加载更多 ↓
推荐阅读:
友情链接:
更多:

本站资料均由网友自行发布提供,仅用于学习交流。如有版权问题,请与我联系,QQ:4156828  

© CopyRight 2020-2024 All Rights Reserved. Powered By 71396.com 闽ICP备11008920号-4
闽公网安备35020302034903号

Top