假设一个进程中包含多个线程,这些线程共享变量 x,我们希望某个(或某些)线程等待 "x==10' 条件成立后再执行后续的代码,该如何实现呢?
您可能想到用 while 循环实现,例如:
void* threadFun(void * args){
while(x != 10){
sleep(5);
}
// 待条件成立后,执行后续的代码
}
当线程执行此函数时,会判断 x 的值是否等于 10,如果不等则间隔 5 秒后再重复判断,直到 x 的值等于 10 ,线程才能执行后续的代码。
直观上看,while 循环确实能够阻塞线程,但这种方法存在严重的效率问题。当线程因条件不成立进入等待状态时,如果此时恰好有另一个线程将 x 的值改为 10,该线程必须等待 5 秒后才能继续执行。如果我们将等待时间缩短(或者直接将 sleep(5) 注释掉),线程将反复判断 x 的值是否等于 10,它可能会一直霸占着 CPU 资源,导致其它线程无法执行,x 变量的值会出现“长时间不改变”的情况。
针对类似的场景,我们推荐您用条件变量来实现。和互斥锁、信号量类似,条件变量本质也是一个全局变量,它的功能是阻塞线程,直至接收到“条件成立”的信号后,被阻塞的线程才能继续执行。
一个条件变量可以阻塞多个线程,这些线程会组成一个等待队列。当条件成立时,条件变量可以解除线程的“被阻塞状态”。也就是说,条件变量可以完成以下两项操作:
为了避免多线程之间发生“抢夺资源”的问题,条件变量在使用过程中必须和一个互斥锁搭配使用。如果您不了解互斥锁或者对互斥锁一知半解,请先阅读《Linux互斥锁实现线程同步》一文。
POSIX 标准中,条件变量用 pthread_cond_t 类型的变量表示,此类型定义在<pthread.h>头文件中。举个例子:
#include <pthread.h>
pthread_cond_t myCond;
由此,我们就成功创建了一个条件变量。要想使用 myCond 条件变量,还需要进行初始化操作。
初始化条件变量的方式有两种,一种是直接将 PTHREAD_COND_INITIALIZER 赋值给条件变量,例如:
还可以借助 pthread_cond_init() 函数初始化条件变量,语法格式如下:
参数 cond 用于指明要初始化的条件变量;参数 attr 用于自定义条件变量的属性,通常我们将它赋值为 NULL,表示以系统默认的属性完成初始化操作。
pthread_cond_init() 函数初始化成功时返回数字 0,反之函数返回非零数。
当 attr 参数为 NULL 时,以上两种初始化方式完全等价。
当条件不成立时,条件变量可以阻塞当前线程,所有被阻塞的线程会构成一个等待队列。
阻塞线程可以借助以下两个函数实现:
cond 参数表示已初始化好的条件变量;mutex 参数表示与条件变量配合使用的互斥锁;abstime 参数表示阻塞线程的时间。
注意,abstime 参数指的是绝对时间,例如您打算阻塞线程 5 秒钟,那么首先要得到当前系统的时间,然后再加上 5 秒,最终得到的时间才是传递的实参值。
调用两个函数之前,我们必须先创建好一个互斥锁并完成“加锁”操作,然后才能作为实参传递给 mutex 参数。两个函数会完成以下两项工作:
也就是说,函数尚未接收到“条件成立”的信号之前,它将一直阻塞线程执行。注意,当函数接收到“条件成立”的信号后,它并不会立即结束对线程的阻塞,而是先完成对互斥锁的“加锁”操作,然后才解除阻塞。
两个函数都以“原子操作”的方式完成“阻塞线程+解锁”或者“重新加锁+解除阻塞”这两个过程。所谓“原子操作”,即当有多个线程执行相同的某个过程时,虽然它们都会访问互斥锁和条件变量,但之间不会相互干扰。
以上两个函数都能用来阻塞线程,它们的区别在于:pthread_cond_wait() 函数可以永久阻塞线程,直到条件变量成立的那一刻;pthread_cond_timedwait() 函数只能在 abstime 参数指定的时间内阻塞线程,超出时限后,该函数将重新对互斥锁执行“加锁”操作,并解除对线程的阻塞,函数的返回值为 ETIMEDOUT。
如果函数成功接收到了“条件成立”的信号,重新对互斥锁完成了“加锁”并使线程继续执行,函数返回数字 0,反之则返回非零数。
POSIX 标准规定,pthread_cond_wait() 和 pthread_cond_timedwait() 函数是可以作为“取消点”的函数。当线程接收到“强制终止执行”的信号后,执行到这两个函数时,线程就会终止执行。有关强制终止执行线程和“取消点”的具体含义,您可以阅读《线程间相互终止执行,这个坑千万别踩!》一文。
对于被 pthread_cond_wait() 或 pthread_cond_timedwait() 函数阻塞的线程,我们可以借助如下两个函数向它们发送“条件成立”的信号,解除它们的“被阻塞”状态:
cond 参数表示初始化好的条件变量。当函数成功解除线程的“被阻塞”状态时,返回数字 0,反之返回非零数。
两个函数都能解除线程的“被阻塞”状态,区别在于:
由于互斥锁的存在,解除阻塞后的线程也不一定能立即执行。当互斥锁处于“加锁”状态时,解除阻塞状态的所有线程会组成等待互斥锁资源的队列,等待互斥锁“解锁”。
对于初始化好的条件变量,我们可以调用 pthread_cond_destory() 函数销毁它。
pthread_cond_destory() 函数的语法格式如下:
cond 参数表示要销毁的条件变量。如果函数成功销毁 cond 参数指定的条件变量,返回数字 0,反之返回非零数。
值得一提的是,销毁后的条件变量还可以调用 pthread_cond_init() 函数重新初始化后使用。
接下来,通过一个实例给您演示条件变量的具体用法。
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
//初始化互斥锁
pthread_mutex_t myMutex = PTHREAD_MUTEX_INITIALIZER;
//初始化条件变量
pthread_cond_t myCond = PTHREAD_COND_INITIALIZER;
//设置全局变量
int x = 0;
//线程执行的函数
void * waitForTrue(void *args) {
int res;
//条件变量阻塞线程之前,先对互斥锁执行“加锁”操作
res = pthread_mutex_lock(&myMutex);
if (res != 0) {
printf("waitForTrue 加锁失败\n");
return NULL;
}
printf("------等待 x 的值为 10\n");
if (pthread_cond_wait(&myCond, &myMutex) == 0) {
printf("x = %d\n", x);
}
//最终将互斥锁解锁
pthread_mutex_unlock(&myMutex);
return NULL;
}
//线程执行的函数
void * doneForTrue(void *args) {
int res;
while (x != 10) {
//对互斥锁执行“加锁”操作
res = pthread_mutex_lock(&myMutex);
if (res == 0) {
x++;
printf("doneForTrue:x = %d\n", x);
sleep(1);
//对互斥锁“解锁”
pthread_mutex_unlock(&myMutex);
}
}
//发送“条件成立”的信号,解除 mythread1 线程的“被阻塞”状态
res = pthread_cond_signal(&myCond);
if (res != 0) {
printf("解除阻塞失败\n");
}
return NULL;
}
int main() {
int res;
pthread_t mythread1, mythread2;
res = pthread_create(&mythread1, NULL, waitForTrue, NULL);
if (res != 0) {
printf("mythread1线程创建失败\n");
return 0;
}
res = pthread_create(&mythread2, NULL, doneForTrue, NULL);
if (res != 0) {
printf("mythread2线程创建失败\n");
return 0;
}
//等待 mythread1 线程执行完成
res = pthread_join(mythread1, NULL);
if (res != 0) {
printf("1:等待线程失败\n");
}
//等待 mythread2 线程执行完成
res = pthread_join(mythread2, NULL);
if (res != 0) {
printf("2:等待线程失败\n");
}
//销毁条件变量
pthread_cond_destroy(&myCond);
return 0;
}
假设程序编写在 thread.c 文件中,执行过程如下:
程序中共创建了 2 个线程 mythread1 和 mythread2,其中 mythread1 线程借助条件变量实现了“直到变量 x 的值为 10 时,才继续执行后续代码”的功能,mythread1 线程用于将 x 的变量修改为 10,同时向 mythread1 线程发送“条件成立”的信号,唤醒 mythread1 线程并继续执行。