您当前的位置:首页 > 计算机 > 编程开发 > VC/VC++

C++多线程编程入门教程

时间:10-06来源:作者:点击数:

在 C++11 标准之前,C++ 语言并没有对并行编程提供语言级别的支持,C++ 使用的多线程都由第三方库提供,如 POSIX 标准(pthread)、OpenMG 库或 Windows线程库,它们都是基于过程的多线程,这使得 C++ 并行编程在可移植性方面存在诸多不足。

为此,C++11 标准增加了线程及线程相关的类,用于支持并行编程,极大地提高了 C++ 并行编程的可移植性。

多线程

C++11 标准提供了 thread 类模板用于创建线程,该类模板定义在 thread 标准库中,因此在创建线程时,需要包含 thread 头文件。

thread 类模板定义了一个无参构造函数和一个变参构造函数,因此在创建线程对象时,可以为线程传入参数,也可以不传入参数。

注意,thread 类模板不提供拷贝构造函数、赋值运算符重载等函数,因此线程对象之间不可以进行拷贝、赋值等操作。

除了构造函数,thread 类模板还定义了两个常用的成员函数:join() 函数和 detach() 函数。

1) join() 函数

该函数将线程和线程对象连接起来,即将子线程加入程序执行。join() 函数是阻塞的,它可以阻塞主线程(当前线程),等待子线程工作结束之后,再启动主线程继续执行任务。

2) detach() 函数

该函数分离线程与线程对象,即主线程和子线程可同时进行工作,主线程不必等待子线程结束。但是,detach() 函数分离的线程对象不能再调用 join() 函数将它与线程连接起来。

【示例1】下面通过案例演示 C++11 标准中线程的创建与使用,代码如下:

#include<iostream>
#include<thread>   //包含头文件
using namespace std;
void func()   //定义函数func()
{
    cout << "子线程工作" << endl;
    cout << "子线程工作结束" << endl;
}
int main()
{
    cout << "主线程工作" << endl;
    thread t(func);   //创建线程对象t
    t.join();   //将子线程加入程序执行
    cout << "主线程工作结束" << endl;
    return 0;
}

运行结果:

主线程工作
子线程工作
子线程工作结束
主线程工作结束

示例分析:

  • 第 4~8 行代码定义了函数 func();
  • 第 12 行代码创建线程对象 t,传入 func() 函数名作为参数,即创建一个子线程去执行 func() 函数的功能;
  • 第 13 行代码调用 join() 函数阻塞主线程。

在 C++ 多线程中,线程对象与线程是相互关联的,线程对象出了作用域之后就会被析构,如果此时线程函数还未执行完,程序就会发生错误,因此需要保证线程函数的生命周期在线程对象生命周期之内。

一般通过调用 thread 中定义的 join() 函数阻塞主线程,等待子线程结束,或者调用 thread 中的 detach() 函数将线程与线程对象进行分离,让线程在后台执行,这样即使线程对象生命周期结束,线程也不会受到影响。

例如,在【示例1】中,将 join() 函数替换为 detach() 函数,将线程对象与线程分离,让线程在后台运行,再次运行程序,运行结果就可能发生变化。即使 main() 函数(主线程)结束,子线程对象 t 生命周期结束,子线程依然会在后台将 func() 函数执行完毕。

小提示:this_thread 命名空间

C++11 标准定义了 this_thread 命名空间,该空间提供了一组获取当前线程信息的函数,分别如下所示:

  • get_id() 函数:获取当前线程 id。
  • yeild() 函数:放弃当前线程的执行权。操作系统会调度其他线程执行未用完的时间片,当时间片用完之后,当前线程再与其他线程一起竞争 CPU 资源。
  • sleep_until() 函数:让当前线程休眠到某个时间点。
  • sleep_for() 函数:让当前线程休眠一段时间。

互斥锁

在并行编程中,为避免多线程对共享资源的竞争导致程序错误,线程会对共享资源进行保护。通常的做法是对共享资源上锁,当线程修改共享资源时,会获取锁将共享资源锁上,在操作完成之后再进行解锁。

加锁之后,共享资源只能被当前线程操作,其他线程只能等待当前线程解锁退出之后再获取资源。为此,C++11 标准提供了互斥锁 mutex,用于为共享资源加锁,让多个线程互斥访问共享资源。

mutex 是一个类模板,定义在 mutex 标准库中,使用时要包含 mutex 头文件。

mutex 类模板定义了三个常用的成员函数:lock() 函数、unlock() 函数和 try_lock() 函数,用于实现上锁、解锁功能。下面分别介绍这三个函数。

1) lock() 函数

lock() 函数用于给共享资源上锁。如果共享资源已经被其他线程上锁,则当前线程被阻塞;如果共享资源已经被当前线程上锁,则产生死锁。

2) unlock() 函数

unlock() 函数用于给共享资源解锁,释放当前线程对共享资源的所有权。

3) try_lock() 函数

try_lock() 函数也用于给共享资源上锁,但它是尝试上锁,如果共享资源已经被其他线程上锁,try_lock() 函数返回false,当前线程并不会被阻塞,而是继续执行其他任务;如果共享资源已经被当前线程上锁,则产生死锁。

【示例2】下面通过案例演示 C++11 标准中 mutex 的上锁、解锁的过程,C++ 代码如下:

#include<iostream>
#include<thread>
#include<mutex>
using namespace std;
int num = 0;   //定义全局变量num
mutex mtx;   //定义互斥锁mtx
void func()
{
    mtx.lock();   //上锁
    cout << "线程id: " << this_thread::get_id() << endl; //获取当前线程id
    num++;
    cout << "counter:" << num << endl;
    mtx.unlock();   //解锁
}
int main()
{
    thread ths[3];      //定义线程数组
    for(int i = 0; i < 3; i++)
        ths[i] = thread(func);   //分配线程任务
    for(auto& th : ths)
        th.join();   //将线程加入程序
    cout << "主线程工作结束" << endl;
    return 0;
}

运行结果:

线程id:  11128
counter: 1
线程id:  10596
counter: 2
线程id:  9392
counter: 3
主线程工作结束

示例分析:

  • 第 5 行代码定义了一个全局变量 num,初始值为 0;
  • 第 6 行代码定义了互斥锁 mtx;
  • 第 7~14 行代码定义了函数 func(),在 func() 函数内部,通过对象 mtx 调用 lock() 函数,为后面的代码上锁;
  • 第 13 行代码通过对象 mtx 调用 unlock() 函数解锁。当某个线程获取互斥锁 mtx 时,该线程会为第 10~12 行代码上锁,即拥有了 func() 函数的所有权,在解锁之前,其他线程不能执行 func() 函数;
  • 第 17 行代码定义了一个大小为 3 的线程数组 ths;
  • 第 18~19 行代码通过 for 循环为每个线程分配任务,即让线程执行 func() 函数;
  • 第 20~21 行代码通过 for 循环调用 join() 函数,将线程加入执行程序,并阻塞当前线程(主线程)。

由运行结果可知:

  • 首先线程“11128”获取了互斥锁 mtx,获得了 func() 函数的执行权,输出 counter 的值为 1,之后解锁;
  • 然后线程“10596”获取了互斥锁 mtx,获得了 func() 函数的执行权,输出 counter 值为 2,之后解锁;
  • 最后线程“9392”获取了互斥锁 mtx,获得了 func() 函数的执行权,输出 counter 值为 3,之后解锁。

如果注释掉第 9 行和第 13 行代码,即不给 func() 函数中的操作上锁,则三个线程会同时执行 func() 函数,输出的结果就会超出预期。例如,连续输出三个线程 id,或者先输出 counter 值为 3,再输出 counter 值为 1。

如果修改【示例2】,调用 try_lock() 函数为 func() 函数上锁,示例代码如下所示:

void func()
{
if (mtx.try_lock())   //调用try_lock()函数加锁
{
      cout << "线程id: " << this_thread::get_id() << endl;
      num++;
      cout << "counter:" << num << endl;
      mtx.unlock();
}
}

再次运行程序,只有一个线程执行 func() 函数。

当某个线程获取了互斥锁 mtx,就会为 func() 函数上锁,获得 func() 函数的执行权。另外两个线程调用 try_lock() 函数尝试上锁时,发现 func() 函数已经被其他线程上锁,这两个线程并没有被阻塞,而是继续执行其他任务(本案例中线程执行结束)。因此,最终只有一个线程执行 func()函数。

lock_guard和unique_lock

我们学习了互斥锁 mutex,通过 mutex 的成员函数为共享资源上锁、解锁,能够保证共享资源的安全性。但是,通过 mutex 上锁之后必须要手动解锁,如果忘记解锁,当前线程会一直拥有共享资源的所有权,其他线程不得访问共享资源,造成程序错误。

此外,如果程序抛出了异常,mutex 对象无法正确地析构,导致已经被上锁的共享资源无法解锁。为此,C++11 标准提供了 RAII 技术的类模板:lock_guard 和 unique_lock。

lock_guard 和 unique_lock 可以管理 mutex 对象,自动为共享资源上锁、解锁,不需要程序设计者手动调用 mutex 的 lock() 函数和 unlock() 函数。即使程序抛出异常,lock_guard 和 unique_lock 也能保证 mutex 对象正确解锁,在简化代码的同时,也保证了程序在异常情况下的安全性。

下面分别介绍 lock_guard 和 unique_lock。

1) lock_guard

lock_guard 可以管理一个 mutex 对象,在创建 lock_guard 对象时,传入 mutex 对象作为参数。

在 lock_guard 对象生命周期内,它所管理的 mutex 对象一直处于上锁状态;lock_guard 对象生命周期结束之后,它所管理的 mutex 对象也会被解锁。

【示例3】下面修改【示例2】来演示 lock_guard 的使用,C++ 代码如下:

#include<iostream>
#include<thread>
#include<mutex>
using namespace std;
int num = 0;   //定义全局变量num
mutex mtx;   //定义互斥锁mtx
void func()
{
    lock_guard<mutex> locker(mtx);  //创建lock_guard对象locker
    cout << "线程id: " << this_thread::get_id() << endl; //获取当前线程id
    num++;
    cout << "counter:" << num << endl;
}
int main()
{
    thread ths[3];   //定义线程数组
    for (int i = 0; i < 3; i++)
        ths[i] = thread(func);   //分配线程任务
    for (auto& th : ths)
        th.join();   //将线程加入程序
    cout << "主线程工作结束" << endl;
    return 0;
}

运行结果:

线程id:  12300
counter: 1
线程id:  13316
counter: 2
线程id:  2316
counter: 3
主线程工作结束

本例第 9 行代码创建了 lock_guard 对象 locker,传入互斥锁 mtx 作为参数,即对象 locker 管理互斥锁 mtx。

当线程执行 func() 函数时,locker 会自动完成对 func()函数的上锁、解锁功能。由运行结果可知,程序运行时,三个线程依旧是互斥执行 func() 函数。

注意,lock_guard 对象只是简化了 mutex 对象的上锁、解锁过程,但它并不负责 mutex 对象的生命周期。在【示例3】中,当 func() 函数执行结束时,lock_guard 对象 locker 析构,mutex 对象 mtx 自动解锁,线程释放 func() 函数的所有权,但对象 mtx 的生命周期并没有结束。

2) unique_lock

lock_guard 只定义了构造函数和析构函数,没有定义其他成员函数,因此它的灵活性太低。为了提高锁的灵活性,C++11 标准提供了另外一个 RAII 技术的类模板unique_lock

unique_lock 与 lock_guard 相似,都可以很方便地为共享资源上锁、解锁,但 unique_lock 提供了更多的成员函数,它有多个重载的构造函数,而且 unique_lock对象支持移动构造和移动赋值。

注意,unique_lock 对象不支持拷贝和赋值。

下面简单介绍几个常用的成员函数:

  • lock() 函数:为共享资源上锁,如果共享资源已经被其他线程上锁,则当前线程被阻塞;如果共享资源已经被当前线程上锁,则产生死锁。
  • try_lock() 函数:尝试上锁,如果共享资源已经被其他线程上锁,该函数返回 false,当前线程继续其他任务;如果共享资源已经被当前线程上锁,则产生死锁。
  • try_lock_for() 函数:尝试在某个时间段内获取互斥锁,为共享资源上锁,如果在时间结束之前一直未获取互斥锁,则线程会一直处于阻塞状态。
  • try_lock_until() 函数:尝试在某个时间点之前获取互斥锁,为共享资源上锁,如果到达时间点之前一直未获取互斥锁,则线程会一直处于阻塞状态。
  • unlock() 函数:解锁。

正是因为提供了更多的成员函数,unique_lock 才能够更灵活地实现上锁和解锁控制,例如,转让 mutex 对象所有权(移动赋值)、在线程等待时期解锁等。但是,更灵活的代价就是空间开销也更大,运行效率相对较低。

在编程过程中,如果只是为了保证数据同步,那么 lock_guard 完全能够满足使用需求。如果除了同步,还需要结合条件变量进行线程阻塞,则要选择 unique_lock。

小提示:RAII技术

RAII(Resource Acquisition Is Initialization,资源获取初始化)是 C++ 语言管理资源、避免内存泄漏的一个常用技术。

RAII 技术利用 C++ 创建的对象最终被销毁的原则,在创建对象时获取对应的资源,在对象生命周期内控制对资源的访问,使资源始终有效。当对象生命周期结束后,释放资源。

条件变量

在多线程编程中,多个线程可能会因为竞争资源而导致死锁,一旦产生死锁,程序将无法继续运行。为了解决死锁问题,C++11 标准引入了条件变量 condition_variable 类模板,用于实现线程间通信,避免产生死锁。

condition_variable 类模板定义了很多成员函数,用于实现进程通信的功能,下面介绍几个常用的成员函数。

1) wait()函数

wait() 函数会阻塞当前线程,直到其他线程调用唤醒函数将线程唤醒。当线程被阻塞时,wait() 函数会释放互斥锁,使得被阻塞在互斥锁上的其他线程能够获取互斥锁以继续执行代码。一旦当前线程被唤醒,它就会重新夺回互斥锁。

wait() 函数有两种重载形式,函数声明分别如下所示:

void wait(unique_lock<mutex>& lck);
template<class Predicate>
void wait(unique_lock<mutex>& lck, Predicate pred);

第一种重载形式称为无条件阻塞,它以 mutex 对象作为参数,在调用 wait() 函数阻塞当前线程时,wait() 函数会在内部自动通过 mutex 对象调用 unlock() 函数解锁,使得阻塞在互斥锁上的其他线程恢复执行。

第二种重载形式称为有条件阻塞,它有两个参数,第一个参数是 mutex 对象,第二个参数是一个条件,只有当条件为 false 时,调用 wait() 函数才能阻塞当前线程;在收到其他线程的通知后,只有当条件为 true 时,当前线程才能被唤醒。

2) wait_for()函数

wait_for() 函数也用于阻塞当前线程,但它可以指定一个时间段,当收到通知或超过时间段时,线程就会被唤醒。

wait_for() 函数声明如下所示:

cv_status wait_for(unique_lock<mutex>& lck, const chrono :: duration<Rep,Period>& rel_time);

在上述函数声明中,wait_for() 函数第一个参数为 unique_lock 对象,第二个参数为设置的时间段。

函数返回值为 cv_status 类型,cv_status 是 C++11 标准定义的枚举类型,它有两个枚举值:no-timeout 和 timeout。

no-timeout 表示没有超时,即在规定的时间段内,当前线程收到了通知;timeout 表示超时。

3) wait_until()函数

可以指定一个时间点,当收到通知或超过时间点时,线程就会被唤醒。

wait_until() 函数声明如下所示:

cv_status wait_until(unique_lock<mutex>& lck,const chrono::time_point<Clock,Duration>& abs_tim);

在上述函数声明中,wait_until() 函数第一个参数为 unique_lock 对象,第二个参数为设置的时间点。函数返回值为 cv_status 类型。

4) notify_one()函数

用于唤醒某个被阻塞的线程。如果当前没有被阻塞的线程,则该函数什么也不做;如果有多个被阻塞的线程,则唤醒哪一个线程是随机的。

notify_one() 函数声明如下所示:

void notify_one() noexcept;

在上述函数声明中,notify_one() 函数没有参数,没有返回值,并且不抛出任何异常。

5) notify_all()函数

用于唤醒所有被阻塞的线程。如果当前没有被阻塞的线程,则该函数什么也不做。

notify_all() 函数声明如下所示:

void notify_all() noexcept;

条件变量用于实现线程间通信,防止死锁发生,为了实现更灵活的上锁、解锁控制,条件变量通常与 unique_lock 结合使用。

【示例4】下面通过案例演示条件变量在并行编程中的使用,C++ 代码如下:

#include<iostream>
#include<chrono>
#include<thread>
#include<mutex>
#include<queue>
using namespace std;
queue<int> products;   //创建队列容器products
mutex mtx;   //创建互斥锁mtx
condition_variable cvar;   //定义条件变量cvar
bool done = false;   //定义变量done,表示产品是否生产完毕
bool notified = false;   //定义变量notified,表示是否唤醒线程
void produce()   //生产函数
{
    for(int i = 1; i <= 5; i++)
    {
        //让当前线程休眠2 s
        this_thread::sleep_for(chrono::seconds(2));
        //创建unique_lock对象locker,获取互斥锁mtx
        unique_lock<mutex> locker(mtx);
        //生产产品,并将产品存放到products容器中
        cout << "生产产品" << i << " ";
        products.push(i);
        //将notified值设置为true
        notified = true;
        //唤醒一个线程
        cvar.notify_one();
    }
    done = true;   //生产完毕,设置done的值为true
    cvar.notify_one();   //唤醒一个线程
}
void consume()   //定义消费函数
{
    //创建unique_lock对象locker,获取互斥锁mtx
    unique_lock<mutex> locker(mtx);
    while(!done)   //判断产品是否生产完毕
    {
        while(!notified)   //避免虚假唤醒
        {
              cvar.wait(locker);  //继续阻塞
        }
        while(!products.empty())   //如果products容器不为空
        {
              //消费产品
              cout << "消费产品" << products.front() << endl;
              products.pop();
        }
        notified = false;   //消费完之后,将notified的值设置为false
      }
}
int main()
{
    thread producer(produce);  //创建生产线程
    thread consumer(consume);  //创建消费线程
    producer.join();
    consumer.join();
    return 0;
}

运行结果:

生产产品1  消费产品1
生产产品2  消费产品2
生产产品3  消费产品3
生产产品4  消费产品4
上产产品5  消费产品5

示例分析:

  • 第 7~11 行代码分别定义了 queue<int> 类型的容器 products、互斥锁 mtx、条件变量 cvar 以及 bool 类型的变量 done、notified;
  • 第 12~30 行代码定义了生产函数 produce(),在该函数内部通过 for 循环生产产品;
  • 第 17 行代码先调用 sleep_for() 让当前线程休眠 2 s;
  • 第 19 行代码创建unique_lock对象 locker,获取互斥锁 mtx;
  • 第 21~22 行代码生产产品 i,并调用 push() 函数将i存储到 proudcts 队列容器中;
  • 第 24~26 行代码,每生产完一个产品,就将 notified 的值设置为 true,然后通过条件变量 cvar 调用notified_one() 函数唤醒一个线程。
  • 第 34 行代码创建了 unique_lock 对象 locker,获取互斥锁 mtx;
  • 第 35~48 行代码通过 while(!done) 循环判断生产是否完毕,在该 while 循环中消费产品;
  • 第 37~40 行代码通过判断 notified 的值是否为 true,来判断是否唤醒消费线程,避免虚假唤醒;
  • 第 41~47 行代码判断容器 products 是否为空,如果不为空,就消费产品;当产品消费完之后,即容器 products 为空,则设置 notified 的值为 false,将消费线程阻塞。
  • 第 52~55 行代码创建生产线程 producer 和消费线程 consumer,分别调用生产函数produce() 和消费函数consume()

程序运行结果为:生产线程每生产一个产品,消费线程就消费一个产品。生产线程每生产完一个产品,就会将 notified 的值设置为 true,然后通过条件变量 cvar 调用 notify_one() 函数唤醒消费线程消费产品。

原子类型

在并行编程中,共享资源同时只能有一个线程进行操作,这些最小的不可并行执行的操作称为原子操作。

原子操作都是通过上锁、解锁实现的,虽然使用lock_guard unique_lock简化了上锁、解锁过程,但是由于上锁、解锁过程涉及许多对象的创建和析构,内存开销太大。为了减少多线程的内存开销,提高程序运行效率,C++11 标准提供了原子类型 atomic

atomic 是一个类模板,它可以接受任意类型作为模板参数。创建的 atomic 对象称为原子变量,使用原子变量就不需要互斥锁保护该变量进行的操作了。

【示例4】在使用原子类型之前来看一个案例,C++ 代码如下:

#include<iostream>
#include<thread>
#include<mutex>
using namespace std;
mutex mtx;   //定义互斥锁
int num = 0;   //定义全局变量num
void func()
{
    lock_guard<mutex> locker(mtx);   //加锁
    for(int i = 0; i < 100000; i++)
    {
        num++;   //通过for循环修改num的值
    }
    cout << "func()num: " << num << endl;
}
int main()
{
    thread t1(func);   //创建线程t1执行func()函数
    thread t2(func);   //创建线程t2执行func()函数
    t1.join();
    t2.join(); 
    cout << "main()num: " << num << endl;
    return 0;
}

运行结果:

func()num:  100000
func()num:  200000
main()num:  200000

示例分析:

  • 第 5~6 行代码定义了互斥锁 mtx 和全局变量 num;
  • 第 7~15 行代码定义了 func() 函数,在该函数内部,通过 for 循环修改 num 的值,循环结束后输出 num 的值,并且使用 lock_guard 为 func() 函数上锁;
  • 第 18~19 行代码创建两个线程 t1 和 t2 执行 func() 函数,两个线程执行结束后输出 num 的值。

由运行结果可知:

  • 第一次 func() 函数输出 num 值为100000。func() 函数执行完毕之后,线程释放锁,接着另一个线程获取互斥锁,执行 func() 函数,再次修改 num 的值并输出。
  • 第二次 func() 函数输出 num 值为 200000。func() 函数执行完毕之后,线程释放锁。

两个线程执行完毕之后,返回 main() 函数,主线程输出 num 的值。

如果使用原子类型定义全局变量 num,在修改 num 的值时,就不需要再给操作代码上锁,也能实现多个线程的互斥访问,保证某一时刻只有一个线程修改 num 的值。

【示例5】下面修改【示例4】,使用原子类型定义全局变量 num,C++ 代码如下:

#include<iostream>
#include<thread>
#include<atomic>
using namespace std;
atomic<int> num = 0;
void func()
{
    for(int i = 0; i < 100000; i++)
    {
        num++;
    }
    cout << "func()num: " << num << endl;
}
int main()
{
    thread t1(func);
    thread t2(func);
    t1.join();
    t2.join(); 
    cout << "main()num: " << num << endl;
    return 0;
}

运行结果:

func()num:  165456
func()num:  200000
main()num:  200000

本例第 5 行代码将 num 定义为全局的原子变量,在 func() 函数中修改 num 的值时未上锁。

【示例5】程序运行过程中,线程 t1 与线程 t2 交叉执行 func() 函数,修改 num 的值,并不是一个线程先执行完成所有 for 循环。输出 num 值之后,另一个线程才能去执行 for 循环进行修改。因此,第一次输出的 num 值并不是 100000,但最终结果是正确的。

原子变量只保证num++是原子操作(第 10 行代码),使得原子操作颗粒度更细(【示例4】中,原子操作为第 10~14 行代码)。它相当于是在“num++”操作上上了锁,示例代码如下所示:

int num=0;
for(int i = 0; i < 100000; i++)
{
     lock_guard<mutex> locker(mtx);  //加锁
     num++;
}

上述代码中,在 for 循环内部上了互斥锁,循环结束,locker 对象失效。

如果有多个线程修改 num,则多个线程会交叉修改 num 的值。但是,相比于上锁,原子类型实现的是无锁编程,内存开销小,程序的运行效率会得到极大提高,并且代码更简洁。

方便获取更多学习、工作、生活信息请关注本站微信公众号城东书院 微信服务号城东书院 微信订阅号
推荐内容
相关内容
栏目更新
栏目热门