You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

484 lines
23 KiB
Markdown

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# 08 | 锁:并发操作中,解决数据同步的四种方法
你好我是LMOS。
我们在前面的课程中探索了开发操作系统要了解的最核心的硬件——CPU、MMU、Cache、内存知道了它们的工作原理。在程序运行中它们起到了至关重要的作用。
在开发我们自己的操作系统以前,还不能一开始就把机器跑起来,而是先要弄清楚数据同步的问题。如果不解决掉数据同步的问题,后面机器跑起来,就会出现很多不可预知的结果。
通过这节课,我会给你讲清楚为什么在并发操作里,很可能得不到预期的访问数据,还会带你分析这个问题的原因以及解决方法。有了这样一个研究、解决问题的过程,对最重要的几种锁(**原子变量,关中断,信号量,自旋锁**),你就能做到心中有数了。
## 非预期结果的全局变量
来看看下面的代码描述的是一个线程中的函数和中断处理函数它们分别对一个全局变量执行加1操作代码如下。
```
int a = 0;
void interrupt_handle()
{
a++;
}
void thread_func()
{
a++;
}
```
首先我们梳理一下编译器的翻译过程通常编译器会把a++语句翻译成这3条指令。
1.把a加载某个寄存器中。
2.这个寄存器加1。
3.把这个寄存器写回内存。
那么不难推断可能导致结果不确定的情况是这样的thread\_func函数还没运行完第2条指令时中断就来了。
因此CPU转而处理中断也就是开始运行interrupt\_handle函数这个函数运行完a=1CPU还会回去继续运行第3条指令此时a依然是1这显然是错的。
下面来看一下表格,你就明白了。
![](https://static001.geekbang.org/resource/image/79/4c/79bfa1d036ebb27yy17ae3edf768ba4c.jpeg?wh=1920*1080)
显然在t2时刻发生了中断导致了t2到t4运行了interrupt\_handle函数t5时刻thread\_func又恢复运行导致interrupt\_handle函数中a的操作丢失因此出错。
## 方法一:原子操作 拿下单体变量
要解决上述场景中的问题,有这样两种思路。一种是**把a++变成原子操作**这里的原子是不可分隔的也就是说要a++这个操作不可分隔即a++要么不执行,要么一口气执行完;另一种就是**控制中断**比如在执行a++之前关掉中断,执行完了之后打开中断。
我们先来看看原子操作,显然靠编译器自动生成原子操作不太可能。第一,编译器没有这么智能,能检测哪个变量需要原子操作;第二,编译器必须要考虑代码的移植性,例如有些硬件平台支持原子操作的机器指令,有的硬件平台不支持原子操作。
既然实现原子操作无法依赖于具体编译器那就需要我们自己动手x86平台支持很多原子指令我们只需要直接应用这些指令比如原子加、原子减原子读写等用汇编代码写出对应的原子操作函数就行了。
好在现代C语言已经支持嵌入汇编代码可以**在C函数中按照特定的方式嵌入汇编代码**了,实现原子操作就更方便了,代码如下。
```
//定义一个原子类型
typedef struct s_ATOMIC{
volatile s32_t a_count; //在变量前加上volatile是为了禁止编译器优化使其每次都从内存中加载变量
}atomic_t;
//原子读
static inline s32_t atomic_read(const atomic_t *v)
{
//x86平台取地址处是原子
return (*(volatile u32_t*)&(v)->a_count);
}
//原子写
static inline void atomic_write(atomic_t *v, int i)
{
//x86平台把一个值写入一个地址处也是原子的
v->a_count = i;
}
//原子加上一个整数
static inline void atomic_add(int i, atomic_t *v)
{
__asm__ __volatile__("lock;" "addl %1,%0"
: "+m" (v->a_count)
: "ir" (i));
}
//原子减去一个整数
static inline void atomic_sub(int i, atomic_t *v)
{
__asm__ __volatile__("lock;" "subl %1,%0"
: "+m" (v->a_count)
: "ir" (i));
}
//原子加1
static inline void atomic_inc(atomic_t *v)
{
__asm__ __volatile__("lock;" "incl %0"
: "+m" (v->a_count));
}
//原子减1
static inline void atomic_dec(atomic_t *v)
{
__asm__ __volatile__("lock;" "decl %0"
: "+m" (v->a_count));
}
```
**以上代码中加上lock前缀的addl、subl、incl、decl指令都是原子操作lock前缀表示锁定总线。**
我们还是来看看GCC支持嵌入汇编代码的模板不同于其它C编译器支持嵌入汇编代码的方式为了优化用户代码GCC设计了一种特有的嵌入方式它规定了汇编代码嵌入的形式和嵌入汇编代码需要由哪几个部分组成如下面代码所示。
```
__asm__ __volatile__(代码部分:输出部分列表: 输入部分列表:损坏部分列表);
```
可以看到代码模板从\_\_asm\_\_开始当然也可以是asm紧跟着\_\_volatile\_\_然后是跟着一对括号最后以分号结束。括号里大致分为4个部分
1.汇编代码部分,这里是实际嵌入的汇编代码。
2.输出列表部分让GCC能够处理C语言左值表达式与汇编代码的结合。
3.输入列表部分也是让GCC能够处理C语言表达式、变量、常量让它们能够输入到汇编代码中去。
4.损坏列表部分告诉GCC汇编代码中用到了哪些寄存器以便GCC在汇编代码运行前生成保存它们的代码并且在生成的汇编代码运行后恢复它们寄存器的代码。
它们之间用冒号隔开如果只有汇编代码部分后面的冒号可以省略。但是有输入列表部分而没有输出列表部分的时候输出列表部分的冒号就必须要写否则GCC没办法判断同样的道理对于其它部分也一样。
这里不会过多展开讲这个技术,详情可参阅[GCC手册](https://www.gnu.org/manual/manual.html)。你可以重点看GAS相关的章节。
下面将用上面一个函数atomic\_add为例子说一下如下所示。
```
static inline void atomic_add(int i, atomic_t *v)
{
__asm__ __volatile__("lock;" "addl %1,%0"
: "+m" (v->a_count)
: "ir" (i));
}
//"lock;" "addl %1,%0" 是汇编指令部分,%1,%0是占位符它表示输出、输入列表中变量或表态式占位符的数字从输出部分开始依次增加这些变量或者表态式会被GCC处理成寄存器、内存、立即数放在指令中。
//: "+m" (v->a_count) 是输出列表部分,“+m”表示(v->a_count)和内存地址关联
//: "ir" (i) 是输入列表部分“ir” 表示i是和立即数或者寄存器关联
```
有了这些原子操作函数之后 ,前面场景中的代码就变成下面这样了:无论有没有中断,或者什么时间来中断,都不会出错。
```
atomic_t a = {0};
void interrupt_handle()
{
atomic_inc(&a);
}
void thread_func()
{
atomic_inc(&a);
}
```
好,说完了原子操作,我们再看看怎么用中断控制的思路解决数据并发访问的问题。
## 方法二:中断控制 搞定复杂变量
中断是CPU响应外部事件的重要机制时钟、键盘、硬盘等IO设备都是通过发出中断来请求CPU执行相关操作的即执行相应的中断处理代码比如下一个时钟到来、用户按下了键盘上的某个按键、硬盘已经准备好了数据。
但是中断处理代码中如果操作了其它代码的数据,这就需要相应的控制机制了,这样才能保证在操作数据过程中不发生中断。
你或许在想,可以用原子操作啊?不过,**原子操作只适合于单体变量**,如整数。操作系统的数据结构有的可能有几百字节大小,其中可能包含多种不同的基本数据类型。这显然用原子操作无法解决。
下面我们就要写代码实现关闭开启、中断了x86 CPU上关闭、开启中断有专门的指令即cli、sti指令它们主要是对CPU的eflags寄存器的**IF位**第9位进行清除和设置CPU正是通过此位来决定是否响应中断信号。这两条指令只能Ring0权限才能执行代码如下。
```
//关闭中断
void hal_cli()
{
__asm__ __volatile__("cli": : :"memory");
}
//开启中断
void hal_sti()
{
__asm__ __volatile__("sti": : :"memory");
}
//使用场景
void foo()
{
hal_cli();
//操作数据……
hal_sti();
}
void bar()
{
hal_cli();
//操作数据……
hal_sti();
}
```
你可以自己思考一下,前面这段代码效果如何?
它看似完美地解决了问题其实有重大缺陷hal\_cli()hal\_sti()**无法嵌套使用**,看一个例子你就明白了,代码如下。
```
void foo()
{
hal_cli();
//操作数据第一步……
hal_sti();
}
void bar()
{
hal_cli();
foo();
//操作数据第二步……
hal_sti();
}
```
上面代码的关键问题在bar函数在关中断下调用了foo函数foo函数中先关掉中断处理好数据然后开启中断回到bar函数中bar函数还天真地以为中断是关闭的接着处理数据以为不会被中断抢占。
那么怎么解决上面的问题呢?我们只要修改一下开启、关闭中断的函数就行了。
我们可以这样操作在关闭中断函数中先保存eflags寄存器然后执行cli指令在开启中断函数中直接恢复之前保存的eflags寄存器就行了具体代码如下。
```
typedef u32_t cpuflg_t;
static inline void hal_save_flags_cli(cpuflg_t* flags)
{
__asm__ __volatile__(
"pushfl \t\n" //把eflags寄存器压入当前栈顶
"cli \t\n" //关闭中断
"popl %0 \t\n"//把当前栈顶弹出到flags为地址的内存中
: "=m"(*flags)
:
: "memory"
);
}
static inline void hal_restore_flags_sti(cpuflg_t* flags)
{
__asm__ __volatile__(
"pushl %0 \t\n"//把flags为地址处的值寄存器压入当前栈顶
"popfl \t\n" //把当前栈顶弹出到eflags寄存器中
:
: "m"(*flags)
: "memory"
);
}
```
从上面的代码中不难发现硬件工程师早就想到了如何解决在嵌套函数中关闭、开启中断的问题pushfl指令把eflags寄存器压入当前栈顶popfl把当前栈顶的数据弹出到eflags寄存器中。
hal\_restore\_flags\_sti()函数的执行是否开启中断完全取决于上一次eflags寄存器中的值并且popfl指令只会影响eflags寄存器中的IF位。这样无论函数嵌套调用多少层都没有问题。
## 方法三:自旋锁 协调多核心CPU
前面说的控制中断看似解决了问题那是因为以前是单CPU同一时刻只有一条代码执行流除了中断会中止当前代码执行流转而运行另一条代码执行流中断处理程序再无其它代码执行流。这种情况下只要控制了中断就能安全地操作全局数据。
但是我们都知道现在情况发生了改变CPU变成了多核心或者主板上安装了多颗CPU同一时刻下系统中存在多条代码执行流控制中断只能控制本地CPU的中断无法控制其它CPU核心的中断。
所以,原先通过控制中断来维护全局数据安全的方案失效了,这就需要全新的机制来处理这样的情况,于是就轮到自旋锁登场了。
我们先看看自旋锁的原理,它是这样的:首先读取锁变量,判断其值是否已经加锁,如果未加锁则执行加锁,然后返回,表示加锁成功;如果已经加锁了,就要返回第一步继续执行后续步骤,因而得名自旋锁。为了让你更好理解,下面来画一个图描述这个算法。
![](https://static001.geekbang.org/resource/image/61/88/619c27c6400344db2310fb82ce8d5788.jpg?wh=2195*2405 "自旋锁原理示意图")
这个算法看似很好,但是想要正确执行它,就**必须保证读取锁变量和判断并加锁的操作是原子执行的。**否则CPU0在读取了锁变量之后CPU1读取锁变量判断未加锁执行加锁然后CPU0也判断未加锁执行加锁这时就会发现两个CPU都加锁成功因此这个算法出错了。
怎么解决这个问题呢这就要找硬件要解决方案了x86 CPU给我们提供了一个原子交换指令xchg它可以让寄存器里的一个值跟内存空间中的一个值做交换。例如让eax=memlockmemlock=eax这个动作是原子的不受其它CPU干扰。
下面我们就去实现自旋锁,代码如下所示。
```
//自旋锁结构
typedef struct
{
volatile u32_t lock;//volatile可以防止编译器优化保证其它代码始终从内存加载lock变量的值
} spinlock_t;
//锁初始化函数
static inline void x86_spin_lock_init(spinlock_t * lock)
{
lock->lock = 0;//锁值初始化为0是未加锁状态
}
//加锁函数
static inline void x86_spin_lock(spinlock_t * lock)
{
__asm__ __volatile__ (
"1: \n"
"lock; xchg %0, %1 \n"//把值为1的寄存器和lock内存中的值进行交换
"cmpl $0, %0 \n" //用0和交换回来的值进行比较
"jnz 2f \n" //不等于0则跳转后面2标号处运行
"jmp 3f \n" //若等于0则跳转后面3标号处返回
"2: \n"
"cmpl $0, %1 \n"//用0和lock内存中的值进行比较
"jne 2b \n"//若不等于0则跳转到前面2标号处运行继续比较
"jmp 1b \n"//若等于0则跳转到前面1标号处运行交换并加锁
"3: \n" :
: "r"(1), "m"(*lock));
}
//解锁函数
static inline void x86_spin_unlock(spinlock_t * lock)
{
__asm__ __volatile__(
"movl $0, %0\n"//解锁把lock内存中的值设为0就行
:
: "m"(*lock));
}
```
上述代码的中注释已经很清楚了关键点在于xchg指令xchg %0, %1 。
其中,%0对应 “r”(1)表示由编译器自动分配一个通用寄存器并填入值1例如mov eax1。而%1对应"m"(\*lock)表示lock是内存地址。把1和内存中的值进行交换若内存中是1则不会影响因为本身写入就是1若内存中是0一交换内存中就变成了1即加锁成功。
自旋锁依然有中断嵌套的问题,也就是说,在使用自旋锁的时候我们仍然要注意中断。
在中断处理程序访问某个自旋锁保护的某个资源时,依然有问题,所以我们要写的自旋锁函数必须适应这样的中断环境,也就是说,它需要在处理中断的过程中也能使用,如下所示。
```
static inline void x86_spin_lock_disable_irq(spinlock_t * lock,cpuflg_t* flags)
{
__asm__ __volatile__(
"pushfq \n\t"
"cli \n\t"
"popq %0 \n\t"
"1: \n\t"
"lock; xchg %1, %2 \n\t"
"cmpl $0,%1 \n\t"
"jnz 2f \n\t"
"jmp 3f \n"
"2: \n\t"
"cmpl $0,%2 \n\t"
"jne 2b \n\t"
"jmp 1b \n\t"
"3: \n"
:"=m"(*flags)
: "r"(1), "m"(*lock));
}
static inline void x86_spin_unlock_enabled_irq(spinlock_t* lock,cpuflg_t* flags)
{
__asm__ __volatile__(
"movl $0, %0\n\t"
"pushq %1 \n\t"
"popfq \n\t"
:
: "m"(*lock), "m"(*flags));
}
```
以上代码实现了关中断下获取自旋锁,以及恢复中断状态释放自旋锁。在中断环境下也完美地解决了问题。
## 方法四:信号量 CPU时间管理大师
无论是原子操作还是自旋锁都不适合长时间等待的情况因为有很多资源数据它有一定的时间性你想去获取它CPU并不能立即返回给你而是要等待一段时间才能把数据返回给你。这种情况你用自旋锁来同步访问这种资源你会发现这是对CPU时间的巨大浪费。
下面我们看看另一种同步机制既能对资源数据进行保护同一时刻只有一个代码执行流访问又能在资源无法满足的情况下让CPU可以执行其它任务。
如果你翻过操作系统的理论书应该对信号量这个词并不陌生。信号量是1965年荷兰学者Edsger Dijkstra提出的是一种用于资源互斥或者进程间同步的机制。这里我们就来看看如何实现这一机制。
你不妨想象这样一个情境:微信等待你从键盘上的输入信息,然后把这个信息发送出去。
这个功能我们怎么实现呢?下面我们就来说说实现它的一般方法,当然具体实现中可能不同,但是原理是相通的,具体如下。
1.一块内存,相当于缓冲区,用于保存键盘的按键码。
2.需要一套控制机制,比如微信读取这个缓冲区,而该缓冲区为空时怎么处理;该缓冲区中有了按键码,却没有代码执行流来读取,又该怎么处理。
我们期望是这样的,一共有三点。
1.当微信获取键盘输入信息时,发现键盘缓冲区中是空的,就进入等待状态。
2.同一时刻,只能有一个代码执行流操作键盘缓冲区。
3.当用户按下键盘时,我们有能力把按键码写入缓冲区中,并且能看一看微信或者其它程序是否在等待该缓冲区,如果是就重新激活微信和其它的程序,让它们重新竞争读取键盘缓冲区,如果竞争失败依然进入等待状态。
其实以上所述无非是三个问题:**等待、互斥、唤醒(即重新激活等待的代码执行流)。**
这就需要一种全新的数据结构来解决这些问题。根据上面的问题这个数据结构至少需要一个变量来表示互斥比如大于0则代码执行流可以继续运行等于0则让代码执行流进入等待状态。还需要一个等待链用于保存等待的代码执行流。
这个数据结构的实现代码如下所示。
```
#define SEM_FLG_MUTEX 0
#define SEM_FLG_MULTI 1
#define SEM_MUTEX_ONE_LOCK 1
#define SEM_MULTI_LOCK 0
//等待链数据结构,用于挂载等待代码执行流(线程)的结构,里面有用于挂载代码执行流的链表和计数器变量,这里我们先不深入研究这个数据结构。
typedef struct s_KWLST
{
spinlock_t wl_lock;
uint_t wl_tdnr;
list_h_t wl_list;
}kwlst_t;
//信号量数据结构
typedef struct s_SEM
{
spinlock_t sem_lock;//维护sem_t自身数据的自旋锁
uint_t sem_flg;//信号量相关的标志
sint_t sem_count;//信号量计数值
kwlst_t sem_waitlst;//用于挂载等待代码执行流(线程)结构
}sem_t;
```
搞懂了信号量的结构,我们再来看看信号量的一般用法,注意信号量在使用之前需要**先进行初始化**。这里假定信号量数据结构中的sem\_count初始化为1sem\_waitlst等待链初始化为空。
使用信号量的步骤,我已经给你列好了。
**第一步,获取信号量。**
1.首先对用于保护信号量自身的自旋锁sem\_lock进行加锁。
2.对信号值sem\_count执行“减1”操作并检查其值是否小于0。
3.上步中检查sem\_count如果小于0就让进程进入等待状态并且将其挂入sem\_waitlst中然后调度其它进程运行。否则表示获取信号量成功。当然最后别忘了对自旋锁sem\_lock进行解锁。
**第二步,代码执行流开始执行相关操作,例如读取键盘缓冲区。**
**第三步,释放信号量。**
1.首先对用于保护信号量自身的自旋锁sem\_lock进行加锁。
2.对信号值sem\_count执行“加1”操作并检查其值是否大于0。
3.上步中检查sem\_count值如果大于0就执行唤醒sem\_waitlst中进程的操作并且需要调度进程时就执行进程调度操作不管sem\_count是否大于0通常会大于0都标记信号量释放成功。当然最后别忘了对自旋锁sem\_lock进行解锁。
这里我给你额外分享一个小技巧,**写代码之前我们常常需要先想清楚算法步骤,建议你像我这样分条列出,因为串联很容易含糊其辞,不利于后面顺畅编码。**
下面我们来看看实现上述这些功能的代码按照理论书籍上说信号量有两个操作downup代码如下。
```
//获取信号量
void krlsem_down(sem_t* sem)
{
cpuflg_t cpufg;
start_step:
krlspinlock_cli(&sem->sem_lock,&cpufg);
if(sem->sem_count<1)
{//如果信号量值小于1,则让代码执行流(线程)睡眠
krlwlst_wait(&sem->sem_waitlst);
krlspinunlock_sti(&sem->sem_lock,&cpufg);
krlschedul();//切换代码执行流下次恢复执行时依然从下一行开始执行所以要goto开始处重新获取信号量
goto start_step;
}
sem->sem_count--;//信号量值减1,表示成功获取信号量
krlspinunlock_sti(&sem->sem_lock,&cpufg);
return;
}
//释放信号量
void krlsem_up(sem_t* sem)
{
cpuflg_t cpufg;
krlspinlock_cli(&sem->sem_lock,&cpufg);
sem->sem_count++;//释放信号量
if(sem->sem_count<1)
{//如果小于1,则说数据结构出错了,挂起系统
krlspinunlock_sti(&sem->sem_lock,&cpufg);
hal_sysdie("sem up err");
}
//唤醒该信号量上所有等待的代码执行流(线程)
krlwlst_allup(&sem->sem_waitlst);
krlspinunlock_sti(&sem->sem_lock,&cpufg);
krlsched_set_schedflgs();
return;
}
```
上述代码中的krlspinlock\_clikrlspinunlock\_sti两个函数只是对前面自旋锁函数的一个**封装**krlschedul、krlwlst\_wait、krlwlst\_allup、krlsched\_set\_schedflgs这几个函数会在进程相关课程进行探讨。
## 重点回顾
又到了这节课结束的时候,我们回顾一下今天都讲了什么。我把这节课的内容为你梳理一下,要点如下。
1.原子变量,在只有**单个变量全局数据**的情况下这种变量非常实用如全局计数器、状态标志变量等。我们利用了CPU的原子指令实现了一组操作原子变量的函数。
2.中断的控制。当要操作的数据很多的情况下用原子变量就不适合了。但是我们发现在单核心的CPU同一时刻只有一个代码执行流除了响应中断导致代码执行流切换不会有其它条件会干扰全局数据的操作所以我们只要在操作全局数据时关闭或者开启中断就行了为此我们开发了控制中断的函数。
3.自旋锁。由于多核心的CPU出现控制中断已经失效了因为**系统中同时有多个代码执行流**,为了解决这个问题,我们开发了自旋锁,自旋锁要么一下子获取锁,要么循环等待最终获取锁。
4.信号量。如果长时间等待后才能获取数据在这样的情况下前面中断控制和自旋锁都不能很好地解决于是我们开发了信号量。信号量由一套数据结构和函数组成它能使获取数据的代码执行流进入睡眠然后在相关条件满足时被唤醒这样就能让CPU能有时间处理其它任务。所以信号量同时解决了三个问题**等待、互斥、唤醒。**
## 思考题
请用代码展示一下自旋锁或者信号量,可能的使用形式是什么样的?
期待你在留言区的分享,也欢迎你把这节课的内容分享给身边的朋友,跟他一起学习交流。
我是LMOS我们下节课见