在 C++11 标准之前,C++ 语言并没有对并行编程提供语言级别的支持,C++ 使用的多线程都由第三方库提供,如 POSIX 标准(pthread)、OpenMG 库或 Windows线程库,它们都是基于过程的多线程,这使得 C++ 并行编程在可移植性方面存在诸多不足。
为此,C++11 标准增加了线程及线程相关的类,用于支持并行编程,极大地提高了 C++ 并行编程的可移植性。
C++11 标准提供了 thread 类模板用于创建线程,该类模板定义在 thread 标准库中,因此在创建线程时,需要包含 thread 头文件。
thread 类模板定义了一个无参构造函数和一个变参构造函数,因此在创建线程对象时,可以为线程传入参数,也可以不传入参数。
注意,thread 类模板不提供拷贝构造函数、赋值运算符重载等函数,因此线程对象之间不可以进行拷贝、赋值等操作。
除了构造函数,thread 类模板还定义了两个常用的成员函数:join() 函数和 detach() 函数。
该函数将线程和线程对象连接起来,即将子线程加入程序执行。join() 函数是阻塞的,它可以阻塞主线程(当前线程),等待子线程工作结束之后,再启动主线程继续执行任务。
该函数分离线程与线程对象,即主线程和子线程可同时进行工作,主线程不必等待子线程结束。但是,detach() 函数分离的线程对象不能再调用 join() 函数将它与线程连接起来。
【示例1】下面通过案例演示 C++11 标准中线程的创建与使用,代码如下:
#include<iostream>
#include<thread> //包含头文件
using namespace std;
void func() //定义函数func()
{
cout << "子线程工作" << endl;
cout << "子线程工作结束" << endl;
}
int main()
{
cout << "主线程工作" << endl;
thread t(func); //创建线程对象t
t.join(); //将子线程加入程序执行
cout << "主线程工作结束" << endl;
return 0;
}
运行结果:
主线程工作
子线程工作
子线程工作结束
主线程工作结束
示例分析:
在 C++ 多线程中,线程对象与线程是相互关联的,线程对象出了作用域之后就会被析构,如果此时线程函数还未执行完,程序就会发生错误,因此需要保证线程函数的生命周期在线程对象生命周期之内。
一般通过调用 thread 中定义的 join() 函数阻塞主线程,等待子线程结束,或者调用 thread 中的 detach() 函数将线程与线程对象进行分离,让线程在后台执行,这样即使线程对象生命周期结束,线程也不会受到影响。
例如,在【示例1】中,将 join() 函数替换为 detach() 函数,将线程对象与线程分离,让线程在后台运行,再次运行程序,运行结果就可能发生变化。即使 main() 函数(主线程)结束,子线程对象 t 生命周期结束,子线程依然会在后台将 func() 函数执行完毕。
C++11 标准定义了 this_thread 命名空间,该空间提供了一组获取当前线程信息的函数,分别如下所示:
在并行编程中,为避免多线程对共享资源的竞争导致程序错误,线程会对共享资源进行保护。通常的做法是对共享资源上锁,当线程修改共享资源时,会获取锁将共享资源锁上,在操作完成之后再进行解锁。
加锁之后,共享资源只能被当前线程操作,其他线程只能等待当前线程解锁退出之后再获取资源。为此,C++11 标准提供了互斥锁 mutex,用于为共享资源加锁,让多个线程互斥访问共享资源。
mutex 是一个类模板,定义在 mutex 标准库中,使用时要包含 mutex 头文件。
mutex 类模板定义了三个常用的成员函数:lock() 函数、unlock() 函数和 try_lock() 函数,用于实现上锁、解锁功能。下面分别介绍这三个函数。
lock() 函数用于给共享资源上锁。如果共享资源已经被其他线程上锁,则当前线程被阻塞;如果共享资源已经被当前线程上锁,则产生死锁。
unlock() 函数用于给共享资源解锁,释放当前线程对共享资源的所有权。
try_lock() 函数也用于给共享资源上锁,但它是尝试上锁,如果共享资源已经被其他线程上锁,try_lock() 函数返回false,当前线程并不会被阻塞,而是继续执行其他任务;如果共享资源已经被当前线程上锁,则产生死锁。
【示例2】下面通过案例演示 C++11 标准中 mutex 的上锁、解锁的过程,C++ 代码如下:
#include<iostream>
#include<thread>
#include<mutex>
using namespace std;
int num = 0; //定义全局变量num
mutex mtx; //定义互斥锁mtx
void func()
{
mtx.lock(); //上锁
cout << "线程id: " << this_thread::get_id() << endl; //获取当前线程id
num++;
cout << "counter:" << num << endl;
mtx.unlock(); //解锁
}
int main()
{
thread ths[3]; //定义线程数组
for(int i = 0; i < 3; i++)
ths[i] = thread(func); //分配线程任务
for(auto& th : ths)
th.join(); //将线程加入程序
cout << "主线程工作结束" << endl;
return 0;
}
运行结果:
线程id: 11128 counter: 1 线程id: 10596 counter: 2 线程id: 9392 counter: 3 主线程工作结束
示例分析:
由运行结果可知:
如果注释掉第 9 行和第 13 行代码,即不给 func() 函数中的操作上锁,则三个线程会同时执行 func() 函数,输出的结果就会超出预期。例如,连续输出三个线程 id,或者先输出 counter 值为 3,再输出 counter 值为 1。
如果修改【示例2】,调用 try_lock() 函数为 func() 函数上锁,示例代码如下所示:
void func()
{
if (mtx.try_lock()) //调用try_lock()函数加锁
{
cout << "线程id: " << this_thread::get_id() << endl;
num++;
cout << "counter:" << num << endl;
mtx.unlock();
}
}
再次运行程序,只有一个线程执行 func() 函数。
当某个线程获取了互斥锁 mtx,就会为 func() 函数上锁,获得 func() 函数的执行权。另外两个线程调用 try_lock() 函数尝试上锁时,发现 func() 函数已经被其他线程上锁,这两个线程并没有被阻塞,而是继续执行其他任务(本案例中线程执行结束)。因此,最终只有一个线程执行 func()函数。
我们学习了互斥锁 mutex,通过 mutex 的成员函数为共享资源上锁、解锁,能够保证共享资源的安全性。但是,通过 mutex 上锁之后必须要手动解锁,如果忘记解锁,当前线程会一直拥有共享资源的所有权,其他线程不得访问共享资源,造成程序错误。
此外,如果程序抛出了异常,mutex 对象无法正确地析构,导致已经被上锁的共享资源无法解锁。为此,C++11 标准提供了 RAII 技术的类模板:lock_guard 和 unique_lock。
lock_guard 和 unique_lock 可以管理 mutex 对象,自动为共享资源上锁、解锁,不需要程序设计者手动调用 mutex 的 lock() 函数和 unlock() 函数。即使程序抛出异常,lock_guard 和 unique_lock 也能保证 mutex 对象正确解锁,在简化代码的同时,也保证了程序在异常情况下的安全性。
下面分别介绍 lock_guard 和 unique_lock。
lock_guard 可以管理一个 mutex 对象,在创建 lock_guard 对象时,传入 mutex 对象作为参数。
在 lock_guard 对象生命周期内,它所管理的 mutex 对象一直处于上锁状态;lock_guard 对象生命周期结束之后,它所管理的 mutex 对象也会被解锁。
【示例3】下面修改【示例2】来演示 lock_guard 的使用,C++ 代码如下:
#include<iostream>
#include<thread>
#include<mutex>
using namespace std;
int num = 0; //定义全局变量num
mutex mtx; //定义互斥锁mtx
void func()
{
lock_guard<mutex> locker(mtx); //创建lock_guard对象locker
cout << "线程id: " << this_thread::get_id() << endl; //获取当前线程id
num++;
cout << "counter:" << num << endl;
}
int main()
{
thread ths[3]; //定义线程数组
for (int i = 0; i < 3; i++)
ths[i] = thread(func); //分配线程任务
for (auto& th : ths)
th.join(); //将线程加入程序
cout << "主线程工作结束" << endl;
return 0;
}
运行结果:
线程id: 12300 counter: 1 线程id: 13316 counter: 2 线程id: 2316 counter: 3 主线程工作结束
本例第 9 行代码创建了 lock_guard 对象 locker,传入互斥锁 mtx 作为参数,即对象 locker 管理互斥锁 mtx。
当线程执行 func() 函数时,locker 会自动完成对 func()函数的上锁、解锁功能。由运行结果可知,程序运行时,三个线程依旧是互斥执行 func() 函数。
注意,lock_guard 对象只是简化了 mutex 对象的上锁、解锁过程,但它并不负责 mutex 对象的生命周期。在【示例3】中,当 func() 函数执行结束时,lock_guard 对象 locker 析构,mutex 对象 mtx 自动解锁,线程释放 func() 函数的所有权,但对象 mtx 的生命周期并没有结束。
lock_guard 只定义了构造函数和析构函数,没有定义其他成员函数,因此它的灵活性太低。为了提高锁的灵活性,C++11 标准提供了另外一个 RAII 技术的类模板unique_lock。
unique_lock 与 lock_guard 相似,都可以很方便地为共享资源上锁、解锁,但 unique_lock 提供了更多的成员函数,它有多个重载的构造函数,而且 unique_lock对象支持移动构造和移动赋值。
注意,unique_lock 对象不支持拷贝和赋值。
下面简单介绍几个常用的成员函数:
正是因为提供了更多的成员函数,unique_lock 才能够更灵活地实现上锁和解锁控制,例如,转让 mutex 对象所有权(移动赋值)、在线程等待时期解锁等。但是,更灵活的代价就是空间开销也更大,运行效率相对较低。
在编程过程中,如果只是为了保证数据同步,那么 lock_guard 完全能够满足使用需求。如果除了同步,还需要结合条件变量进行线程阻塞,则要选择 unique_lock。
RAII(Resource Acquisition Is Initialization,资源获取初始化)是 C++ 语言管理资源、避免内存泄漏的一个常用技术。
RAII 技术利用 C++ 创建的对象最终被销毁的原则,在创建对象时获取对应的资源,在对象生命周期内控制对资源的访问,使资源始终有效。当对象生命周期结束后,释放资源。
在多线程编程中,多个线程可能会因为竞争资源而导致死锁,一旦产生死锁,程序将无法继续运行。为了解决死锁问题,C++11 标准引入了条件变量 condition_variable 类模板,用于实现线程间通信,避免产生死锁。
condition_variable 类模板定义了很多成员函数,用于实现进程通信的功能,下面介绍几个常用的成员函数。
wait() 函数会阻塞当前线程,直到其他线程调用唤醒函数将线程唤醒。当线程被阻塞时,wait() 函数会释放互斥锁,使得被阻塞在互斥锁上的其他线程能够获取互斥锁以继续执行代码。一旦当前线程被唤醒,它就会重新夺回互斥锁。
wait() 函数有两种重载形式,函数声明分别如下所示:
void wait(unique_lock<mutex>& lck);
template<class Predicate>
void wait(unique_lock<mutex>& lck, Predicate pred);
第一种重载形式称为无条件阻塞,它以 mutex 对象作为参数,在调用 wait() 函数阻塞当前线程时,wait() 函数会在内部自动通过 mutex 对象调用 unlock() 函数解锁,使得阻塞在互斥锁上的其他线程恢复执行。
第二种重载形式称为有条件阻塞,它有两个参数,第一个参数是 mutex 对象,第二个参数是一个条件,只有当条件为 false 时,调用 wait() 函数才能阻塞当前线程;在收到其他线程的通知后,只有当条件为 true 时,当前线程才能被唤醒。
wait_for() 函数也用于阻塞当前线程,但它可以指定一个时间段,当收到通知或超过时间段时,线程就会被唤醒。
wait_for() 函数声明如下所示:
cv_status wait_for(unique_lock<mutex>& lck, const chrono :: duration<Rep,Period>& rel_time);
在上述函数声明中,wait_for() 函数第一个参数为 unique_lock 对象,第二个参数为设置的时间段。
函数返回值为 cv_status 类型,cv_status 是 C++11 标准定义的枚举类型,它有两个枚举值:no-timeout 和 timeout。
no-timeout 表示没有超时,即在规定的时间段内,当前线程收到了通知;timeout 表示超时。
可以指定一个时间点,当收到通知或超过时间点时,线程就会被唤醒。
wait_until() 函数声明如下所示:
cv_status wait_until(unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_tim);
在上述函数声明中,wait_until() 函数第一个参数为 unique_lock 对象,第二个参数为设置的时间点。函数返回值为 cv_status 类型。
用于唤醒某个被阻塞的线程。如果当前没有被阻塞的线程,则该函数什么也不做;如果有多个被阻塞的线程,则唤醒哪一个线程是随机的。
notify_one() 函数声明如下所示:
void notify_one() noexcept;
在上述函数声明中,notify_one() 函数没有参数,没有返回值,并且不抛出任何异常。
用于唤醒所有被阻塞的线程。如果当前没有被阻塞的线程,则该函数什么也不做。
notify_all() 函数声明如下所示:
void notify_all() noexcept;
条件变量用于实现线程间通信,防止死锁发生,为了实现更灵活的上锁、解锁控制,条件变量通常与 unique_lock 结合使用。
【示例4】下面通过案例演示条件变量在并行编程中的使用,C++ 代码如下:
#include<iostream>
#include<chrono>
#include<thread>
#include<mutex>
#include<queue>
using namespace std;
queue<int> products; //创建队列容器products
mutex mtx; //创建互斥锁mtx
condition_variable cvar; //定义条件变量cvar
bool done = false; //定义变量done,表示产品是否生产完毕
bool notified = false; //定义变量notified,表示是否唤醒线程
void produce() //生产函数
{
for(int i = 1; i <= 5; i++)
{
//让当前线程休眠2 s
this_thread::sleep_for(chrono::seconds(2));
//创建unique_lock对象locker,获取互斥锁mtx
unique_lock<mutex> locker(mtx);
//生产产品,并将产品存放到products容器中
cout << "生产产品" << i << " ";
products.push(i);
//将notified值设置为true
notified = true;
//唤醒一个线程
cvar.notify_one();
}
done = true; //生产完毕,设置done的值为true
cvar.notify_one(); //唤醒一个线程
}
void consume() //定义消费函数
{
//创建unique_lock对象locker,获取互斥锁mtx
unique_lock<mutex> locker(mtx);
while(!done) //判断产品是否生产完毕
{
while(!notified) //避免虚假唤醒
{
cvar.wait(locker); //继续阻塞
}
while(!products.empty()) //如果products容器不为空
{
//消费产品
cout << "消费产品" << products.front() << endl;
products.pop();
}
notified = false; //消费完之后,将notified的值设置为false
}
}
int main()
{
thread producer(produce); //创建生产线程
thread consumer(consume); //创建消费线程
producer.join();
consumer.join();
return 0;
}
运行结果:
生产产品1 消费产品1
生产产品2 消费产品2
生产产品3 消费产品3
生产产品4 消费产品4
上产产品5 消费产品5
示例分析:
程序运行结果为:生产线程每生产一个产品,消费线程就消费一个产品。生产线程每生产完一个产品,就会将 notified 的值设置为 true,然后通过条件变量 cvar 调用 notify_one() 函数唤醒消费线程消费产品。
在并行编程中,共享资源同时只能有一个线程进行操作,这些最小的不可并行执行的操作称为原子操作。
原子操作都是通过上锁、解锁实现的,虽然使用lock_guard 和unique_lock简化了上锁、解锁过程,但是由于上锁、解锁过程涉及许多对象的创建和析构,内存开销太大。为了减少多线程的内存开销,提高程序运行效率,C++11 标准提供了原子类型 atomic。
atomic 是一个类模板,它可以接受任意类型作为模板参数。创建的 atomic 对象称为原子变量,使用原子变量就不需要互斥锁保护该变量进行的操作了。
【示例4】在使用原子类型之前来看一个案例,C++ 代码如下:
#include<iostream>
#include<thread>
#include<mutex>
using namespace std;
mutex mtx; //定义互斥锁
int num = 0; //定义全局变量num
void func()
{
lock_guard<mutex> locker(mtx); //加锁
for(int i = 0; i < 100000; i++)
{
num++; //通过for循环修改num的值
}
cout << "func()num: " << num << endl;
}
int main()
{
thread t1(func); //创建线程t1执行func()函数
thread t2(func); //创建线程t2执行func()函数
t1.join();
t2.join();
cout << "main()num: " << num << endl;
return 0;
}
运行结果:
func()num: 100000 func()num: 200000 main()num: 200000
示例分析:
由运行结果可知:
两个线程执行完毕之后,返回 main() 函数,主线程输出 num 的值。
如果使用原子类型定义全局变量 num,在修改 num 的值时,就不需要再给操作代码上锁,也能实现多个线程的互斥访问,保证某一时刻只有一个线程修改 num 的值。
【示例5】下面修改【示例4】,使用原子类型定义全局变量 num,C++ 代码如下:
#include<iostream>
#include<thread>
#include<atomic>
using namespace std;
atomic<int> num = 0;
void func()
{
for(int i = 0; i < 100000; i++)
{
num++;
}
cout << "func()num: " << num << endl;
}
int main()
{
thread t1(func);
thread t2(func);
t1.join();
t2.join();
cout << "main()num: " << num << endl;
return 0;
}
运行结果:
func()num: 165456 func()num: 200000 main()num: 200000
本例第 5 行代码将 num 定义为全局的原子变量,在 func() 函数中修改 num 的值时未上锁。
【示例5】程序运行过程中,线程 t1 与线程 t2 交叉执行 func() 函数,修改 num 的值,并不是一个线程先执行完成所有 for 循环。输出 num 值之后,另一个线程才能去执行 for 循环进行修改。因此,第一次输出的 num 值并不是 100000,但最终结果是正确的。
原子变量只保证num++是原子操作(第 10 行代码),使得原子操作颗粒度更细(【示例4】中,原子操作为第 10~14 行代码)。它相当于是在“num++”操作上上了锁,示例代码如下所示:
int num=0;
for(int i = 0; i < 100000; i++)
{
lock_guard<mutex> locker(mtx); //加锁
num++;
}
上述代码中,在 for 循环内部上了互斥锁,循环结束,locker 对象失效。
如果有多个线程修改 num,则多个线程会交叉修改 num 的值。但是,相比于上锁,原子类型实现的是无锁编程,内存开销小,程序的运行效率会得到极大提高,并且代码更简洁。