线程

鸣谢

非常感谢以下大牛的视频或文章,由于引入过多,所以没有在对应地方注明引用:

C++ 线程库在 Linux 下使用 pthread 作为底层,但是 pthread 并不是默认库,因此在编译时需要手动加上 -lpthread 以链接 pthread 库

线程间通信

  • 管道

  • 共享内存

  • 信号量

  • DBus

  • 消息队列

  • 套接字

线程对象

线程的创建有三种方式:

  • 函数及仿函数

    这时函线程的第一个参数为仿函数对象或函数指针,其后的参数为仿函数或函数的传入参数

  • 成员函数指针

    这是第一个参数是成员函数指针,第二个参数为对象,其后的参数为成员函数的参数

线程在对象创建时启动 。之后需要决定线程是以加入的方式函数以分离的方式运行。

  • 若线程以分离的方式运行,则此线程进入后台,主线程与线程不再拥有任何可直接交互的方式

  • 若线程以加入的方式运行,则主进程将会在调用 join() 的地方被阻塞至子线程运行结束

若线程被分离,则需要谨慎使用局部变量,防止局部变量在使用前被析构。 若线程被加入,则应当在线程抛出异常之前加入,否则加入操作无效。

  • 如果你传入的参数是一个引用对象,则需要使用 std::ref ,否则无法通过编译

  • 尽管线程分离后其生命周期就不再受主线程掌控,但是 C++ 保证在线程退出后其相关资源能被正确回收。

  • 如果创建进程失败,则抛出异常

线程只能被加入或分离一次。之后线程对象变为 不可加入的 。主线程就无法直接与子线程直接进行交互。之后可以使用 joinable() 测试线程是否可以执行加入或 分离 操作

若线程对象被析构时还没有决定加入还是分离线程(也就是说线程此时是可加入的),则直接调用 terminate() 中断线程。否则不做任何事情。[1]

程序员应当保证线程对象在被析构是是不可加入的,这是因为:

  • 在线程对象的析构函数中分离线程会导致很难正确地调试

  • 在线程对象的析构函数中加入线程在抛出异常时会导致 Bugs

调用成员函数

要在线程中调用成员函数,只需要将 thread 的第一个参数设置为成员指针,第二个参数设置为对象地址即可。

struct Test {
    int  a = 10;
    void print() {
        printf("%d", a);
    }
    void test() {
        std::thread t(&Test::print, this);
        t.join();
    }
};

int main(int argc, char* argv[]) {
    Test a;
    a.test();
    return 0;
}

但是对于 pthread_create 而言,则要求函数必须是静态函数

线程所有权

线程对象对线程拥有 唯一 所有权。这意味着线程对象只能拥有一个线程的所有权,而且这所有权只能被 移动 ,而不能被拷贝。当且仅当在线程对象的所有权被移交后,其才能获得其他线程的所有权,否则会导致程序崩溃:[2]

void some_function();
void some_other_function();
std::thread t1(some_function);
std::thread t2=std::move(t1);
t1=std::thread(some_other_function); // 临时对象默认使用移动语义
std::thread t3;
t3=std::move(t2);
t1=std::move(t3); // 程序崩溃

线程 id 和原生句柄

通过 get_id() 函数可以获得线程的 id 。如果线程对象没有拥有任何线程的所有权,则返回值为 std::thread::type() 的默认值。

通过 native_handle() 可以获得线程的原生句柄,这句柄与程序的运行平台相关。

async

async 有两个参数,第一个参数是一个枚举类型,第二个参数是一个可调用对象

第一个参数:

  1. std::launch::async 传递的可调用对象异步执行;

  2. std::launch::deferred 传递的可调用对象同步执行;

  3. std::launch::async | std::launch::deferred

可以异步或是同步,取决于操作系统,我们无法控制(默认)

其返回结果是一个 future 对象。async 的析构函数中会调用 join 等待任务完成

条件竞争

条件竞争的触发具有随机性,很难复现,但是后果却是破坏性的

线程互斥和线程同步

NOOTE: 条件竞争是指多线程的情况下,程序的运行结果与线程的运行顺序有关。实质上是由于进程间没有处理好数据共享的原因

条件竞争的解决方法有三种:

  • 保证数据的修改是原子性的

  • 使用互斥量保护临界资源

  • 使用日志和事务

使用互斥量、条件变量和信号量的内容在操作系统课程中有提到,这里不再赘述

互斥量

C++ 一共提供了三种互斥锁,其特性如下:

能力

std::mutex

独占互斥量,不能递归使用

std::recursive_mutex

递归互斥量,不带超时功能

std::timed_mutex

带超时的独占互斥量,不能递归使用

std::recursive_timed_mutex

带超时的递归互斥量

  • std::recursive_mutex:

    • 递归锁的递归是有计数器的,超过最大计数器会抛异常

    • 比非递归锁效率低

  • std::timed_mutex 可以设置一个 lock 的最大值,超时则放弃锁,相比普通的 std::mutex 而言,主要多了以下几点:

    • 可以使用 try_lock_for() 尝试获取锁,在达到最大时限后放弃

    • 可以使用 try_lock_until() 尝试获取锁,在达到时间点后放弃

一般用来等待某个条件,我们可以简单地使用 while(…​) 或者 std::mutex 。第一种形式一般会和一个名为 yield() 的函数相配合,此函数会告知操作系统让出 CPU 以减少忙等的时间,但是第二种不会, std::mutex 会导致线程挂起,因此无需进行 yield() 操作

退出的 进程 和被强行杀死的 进程 持有的 std::mutex 会被释放,因为互斥锁是系统资源,而且互斥锁进行 lock 时是没有额外开销的

C++ 提供了便利类 std::lock_guardunique_lock ,其在构造函数中为互斥量加锁,在析构函数中进行解锁。

两者的差别如下:[3]

  • unique_lock 也可以加 std::adopt_lock 参数,表示互斥量已经被 lock,不需要再重复 lock。该互斥量之前必须已经 lock,才可以使用该参数。

  • unique_lock 可以加 std::try_to_lock 参数用来尝试获取锁。如果获取失败,则 owns_locks() 返回 false

mutex 会导致 cache 刷新,在 lock 之后读入的全局变量都是最新的,而且 mutex lock 的挂起和恢复都会有用户态到核心态的切换,一共是两次。mutex 的代价是上百条指令。因此对于特别快的等待可以简单地使用忙等

在操作系统中存在有名互斥锁可供不同进程之间进行同步,但是 C++ 11 并不支持

Recursive lock 可以在同一个线程中被多次 lock,但是要求 lock 和 unlock 相匹配。

你可能需要将 mutex 声明为 mutable 以便在 get() const 函数中使用

lock_guard

lock_guard 用来锁定一个互斥锁,并在析构函数中解锁

相比普通的 lock_guard,unique_lock 具有更多的功能。例如延迟锁定,锁定的有限尝试、所有权转移、递归锁定并与条件变量一起使用。但是它性能也要比 lock_guard 差一点

构造函数可以传入以下参数:

参数含义

std::unique_lock

表明当前已经加锁,因此无需再加锁

std::try_to_lock

尝试加锁,失败就跳过去

std::defer_lock

先不枷锁,后面手动加

unique_lock 具有移动语义,无法被拷贝

条件变量

条件变量可以用来等待某一个条件的发生。

mutex              mut;
queue<int>         data_queue;
condition_variable cond;

thread pro([&]() {
   mut.lock();
   for(int i = 0; i < 1000; ++i) { data_queue.push(i); }
   mut.unlock();
   // 通知一个进程
   cond.notify_one();
});

thread wa([&]() {
   unique_lock<mutex> lk(mut);
   // 等待 data_queue 不为空
   cond.wait(lk, [&]() {
         return data_queue.size();
   });
   while(data_queue.size()) {
         cout << data_queue.front() << ' ';
         data_queue.pop();
   }
});
wa.join();
pro.join();
  • 条件变量更多的适用于 task 编程而不是 data 编程

  • 条件变量的 wait() 有两个重载版本,更加推荐使用参数多的那个,因为其会检查 pred() 的条件,而且另一个先 notify()wait() 是不行的,其更加底层

期望

期望用于获取异步任务的结果

future<int> res = std::async([]() {
   int sum = 0;
   for(int i = 1; i <= 100; ++i) sum += i;
   return sum;
});
cout << res.get() << endl;

如果在调用 get() 的时候异步任务还没有返回,那么会阻塞至任务结束

另外,还可以使用任务包简化期望的获取:

packaged_task<int(void)> task([]() {
   int sum = 0;
   for(int i = 1; i <= 100; ++i) sum += i;
   return sum;
});

auto   res = task.get_future();
thread t(move(task));
cout << res.get() << endl;
t.join();

如果 task 在调用中发生了异常,那么此异常将会被存储并在 future 调用 get 后被抛出

future 是不可拷贝的、线程不安全的、一次性的,所谓一次性是指在调用了 get 后 future 就失效了(再次调用会抛出异常)。如果想要在多个线程之间共享 future 的数据,则需要使用 shared_future:

promise<int>       promiseInt;
shared_future<int> sh(promiseInt.get_future());
thread             t([&sh]() {
   cout << sh.get() << endl;
});
promiseInt.set_value(10);
t.join();

Promise

Promise 可以用于在不同线程间同步数据:

promise<int> promiseInt;
auto         fut = promiseInt.get_future();
thread       t([&fut]() {
   cout << fut.get() << endl;
});
promiseInt.set_value(10);
t.join();

在调用 set_value 后,promise 对象状态会变为 ready,此时其它线程中的 future.get() 才能拿到数据

promise 还能使用 set_exception 来存入一个异常

锁存器

锁存器在构造函数中传入一个数字,当数字减少为零的时候锁存器状态就绪。使用 count_down 将锁存器减小,通过 wait 对线程进行阻塞:

latch                done(8);
vector<future<void>> data;
for(size_t i = 0; i < 8; ++i) {
   data.push_back(async(launch::async, [&]() {
      done.count_down();
   }));
}
done.wait();
cout << data.size();

栅栏

栅栏 (Barrier) 是一种高级同步原语,用于将一批线程阻塞到某一阶段。此原语自 C++ 20 可用。

CAS

C++ 中的 CAS 操作是 atomic 的成员函数,因此必须对 atomic 特化。有两个 cas 形式:

  • compare_exchange_weak

  • compare_exchange_strong

compare_exchange_weak 允许假失败。之所以会出现这个问题是因为 cas 是直接比较的内存内容,而不是调用了 operator== 函数。也因此 weak 比 strong 性能更好。一般推荐使用 while + weak 版本

CAS 执行这样一个操作:

if(*this == expected) *this = desired;
else expected = *this;

也就是说,如果比较失败,会将 expected 更新为 *this。这一点是非常重要的

cas 的 ABA 问题:

ABA 问题是指在 CAS 操作时,其他线程将变量值 A 改为了 B,但是又被改回了 A,等到本线程使用期望值 A 与当前变量进行比较时,发现变量 A 没有变,于是 CAS 就将 A 值进行了交换操作,但是实际上该值已经被其他线程改变过

TLS

get_tid() 是非常快的

TLS (Thread Local Storage) 线程本地储存。

TLS 的基本使用方式:(C++ 不需要)

  1. 申请 Slot

  2. 对对应的 Solt 进行读写,一般使用指针

什么时候使用?

  • 你需要和线程绑定一些数据(一个线程一般是没法访问另一个线程的 TLS 的)

  • 在不同函数中都需要访问

  • TLS 由于实现的问题(参考 get_tid() ),因此速度非常快,而且不需要同步,但是 TLS 是系统资源,是有限的,所以尽量将所有数据放在一起,只在 TLS 中传入指针

一个使用场景是将全局变量拷贝到 TLS 中,随意进行读写,在线程结束时再写回(要做好同步)

C++ 只需要使用 thread_local 就可以声明一个 TLS 变量,此变量在不同线程中会有不同的副本:

thread_local int a = 0;
mutex            mu;

thread t([&]() {
   mu.lock();
   a = 20;
   cout << a << endl;
   mu.unlock();
});
thread t1([&]() {
   mu.lock();
   a = 30;
   cout << a << endl;
   mu.unlock();
});
t.join();
t1.join();
cout << a << endl;

实际上互斥锁是不用加的,这里只是为了演示方便。线程 t 和 线程 t1 和主线程之间对变量 a 的修改是完全隔离的

内存模型与原子类型

C++ 最基本的 存储单元 (Storage Unit) 是字节,字节至少为 8 位,且必须在内存中连续。 C++ 在进行读写操作时的最小单位就是一个存储单元。[4]

存储位置 (Memory Location) 是一个 标量对象 (Object of Scalar Type) 或一个 完整的、非零长度的位域 。多个线程可以独立地访问不同的存储位置而无需担心它们发生冲突。

存储单元的长度为字节的整数倍,如果位域的长度无法被字节整除,这进行 内存对齐 。相邻的位域会进行合并。长度为零的无名位域会强制下一位域对齐到下一类型边界[5]

struct {
   char a;
   int b:5,
   c:11,
   :0,
   d:8,
   struct {int ee:8} e;
}

以上,a, d, e.ee 都位于独立的存储位置中,他们可以同时被不同的线程访问而不受任何影响,但是 b, c 共同 构成了四个内存位置,同时进行更新是不安全的。

当进程需要访问同一个储存单元时,若有进程执行更新操作,这为了保证数据的一致性,要么加锁,要么使用 原子变量

原子变量

原子类型定义在头文件 <atomic> 中。原子类型 std::atomic<T> 的实现要么是基于互斥量(有锁)的,要么是基于信号量(无锁)的。要判断到底是那种类型,可以使用 constexpr std::atomic<T>::is_always_lock_free 。对于基本类型而言,其定义了一系列便利宏用来判断是否有锁,例如:

ATOMIC_BOOL_LOCK_FREE
ATOMIC_CHAR_LOCK_FREE
ATOMIC_CHAR16_T_LOCK_FREE
ATOMIC_CHAR32_T_LOCK_FREE
ATOMIC_WCHAR_T_LOCK_FREE
ATOMIC_SHORT_LOCK_FREE
ATOMIC_INT_LOCK_FREE

宏的结果有三种:

0

无锁

1

运行时才能确定

2

有锁

std::atomic_flag<T> 总是无锁的。这种类型意味着在对类型 T 进行读写、测试并置位、清除的时候都是十分简单的。[6]

对于内置类型而言,其定义了一系列原子类型。如 atomic_char 等。尽量不要混合使用 atomic_charstd::atomic<char>

  • 系统调用对于用户态程序来说是原子的

  • 绝大多数泛型算法是线程安全的,因为这些都是无状态纯函数。(但是容器不是线程安全的)

内存顺序

内存顺序:[7]

内存顺序含义作用

memory_order_relaxed

自由顺序

不规定原子变量的读写顺序,这时写操作可能会被重新排序

mermory_order_release

释放有序

之前的读写不能重排到此操作之后

mermory_order_acquire

获取有序

之后的读写操作不能重排到此操作之前

memory_order_acq_rel

获取/释放有序

同时满足 mermory_order_release 和 mermory_order_acquire

memory_order_consume

消费有序

只规定指定变量的操作顺序

memory_order_seq_cst

顺序一致

默认。线程之间的顺序是确定的

例如:

atomic_bool x, y;
atomic_int  z = 0;

thread a([&]() {
   x.store(true, std::memory_order_relaxed);
   y.store(true, std::memory_order_relaxed);
});
thread b([&]() {
   while(!y.load(std::memory_order_relaxed));
   if(x.load(std::memory_order_relaxed)) ++z;
});
a.join();
b.join();

这里 z 的值可能是 0,因为 memory_order_relaxed 不保证顺序,有可能 y.store 排在 x.store 前面,而这时如果 b 线程再执行结束就会出现这种情况

而对于 memory_order_release 而言,假设现在有四个 store 操作,分别对 A,B,C,D 如果 C 是 memory_order_release 的,那么 A,B 不能被重排到 C 之后。但是 D 可能被重排到 C 之前。memory_order_release 用作数据发布,放在写操作的末尾

memory_order_acquire 与 memory_order_release 类似,但是后续的操作不能被重排到当前变量之前。适用于获取数据,放在写操作的开始

memory_order_release 和 memory_order_acquire 一般会成对出现,用来同步线程

memory_order_acq_rel 这是 memory_order_acquire 与 memory_order_release 的结合

memory_order_consume:

atomic_bool x = false;
atomic_bool y = false;

thread a([&]() {
   y.store(false, std::memory_order_release);
   x.store(true, std::memory_order_relaxed);
   y.store(true, std::memory_order_release);
});
thread b([&]() {
   while(!y.load(std::memory_order_relaxed));
   if(x.load(std::memory_order_relaxed))
      assert(x.load(std::memory_order_relaxed) == true);
});

这里只能保证 y 的内存操作是有序的,而 x 的内存序是没有规定的,因此为 false

  • 对当前要读取的内存施加 release 语义(store),在代码中这条语句后面所有与这块内存有关的读写操作都无法被重排到这个操作之前

  • 在这个原子变量上施加 release 语义的操作发生之后,consume 可以保证读到所有在 release 前发生的并且与这块内存有关的写入[8]

  • 程序员的自我修养(⑪):C++ 的内存顺序·上

内存屏障

和内存模型类似,内存屏障用于阻止代码重排,但是内存模型只适用于原子变量,而内存屏障作用于所有代码:

std::atomic_thread_fence(内存模型)

暂停线程和终止线程

暂停互斥量 加锁就可以暂停互斥量,将 终止标志位 置 1 就可以终止线程。

并行算法库

C++ 为一些标准库函数设计了并行版本,其主要特征是这些函数的第一个参数被用于设置 执行策略 。这些并行算法在头文件 <algorithm><numeric> 中。以下算法被重载为并行版本:

all_of
any_of
none_of
for_each
for_each_n
find
find_if
find_end
find_first_of
adjacent_find
count
count_if
mismatch
equal
search
search_n
copy
copy_n
copy_if
move
swap_ranges
transform
replace
replace_if
replace_copy
replace_copy_if
fill
fill_n
generate
generate_n
remove
remove_if
remove_copy
remove_copy_if
unique
unique_copy
reverse
reverse_copy
rotate
rotate_copy
is_partitioned
partition
stable_partition
partition_copy
sort
stable_sort
partial_sort
partial_sort_copy
is_sorted
is_sorted_until
nth_element
merge
inplace_merge
includes
set_union
set_intersection
set_difference
set_symmetric_difference
is_heap
is_heap_until
min_element
max_element
minmax_element
lexicographical_compare
reduce
transform_reduce
exclusive_scan
inclusive_scan
transform_exclusive_scan
transform_inclusive_scan和adjacent_difference

执行策略

这些内容定义在头文件 <execution>

执行策略有三种:

  • std::execution::sequenced_policy

  • std::execution::parallel_policy

  • std::execution::parallel_unsequenced_policy

管道

执行 Bash 命令并从管道中读取标准输出:[9]

#include<cstdlib>
#include<string>
#include<cstdio>
#include<cstring>
#include<iostream>
#include<algorithm>

using namespace std;

void Test(void){
   char line[300];
   FILE *fp;
   string cmd = "ps -ef| grep java | awk '{print $2}'";
   // 引号内是你的linux指令
   // 系统调用
   const char *sysCommand = cmd.data();
   if ((fp = popen(sysCommand, "r")) == nullptr) {
      cout << "error" << endl;
      return;
   }
   while (fgets(line, sizeof(line)-1, fp) != nullptr){
      cout << line ;
   }
   pclose(fp);
}

OpenMP

OpenMP 是 C++ 编译器的一个拓展,是一个可靠的跨平台的多线程使用方式,GCC, LLVM, MSVC 都支持它。

  • 对于 GCC 而言,只需要添加 -fopenmp 标志即可

对于 OpenMP,建议主要掌握下面两条原语:

  • #pragma omp parallel

  • #pragma omp parallel for

OpenMP 有以下几点需要注意:

  • omp_get_num_threads() 与 parallel block 有关,它可以在并行块之外调用,但是需要测试以下输出的值。要得到当前 CPU 核心数,更加推荐使用 std::thread::hardware_concurrency() 或者 omp_get_max_threads()

  • omp 的原语还有一些复杂用法,但是为了代码的可读性,并不推荐使用

  • 对于数据并行而言,更加推荐使用 OpenMP

事务性内存

事务性内存是 C++ 23 的 TS,但是自 GCC 6.1 其就可以通过 -fgnu-tm 启用此规范[10]

无锁编程

无锁编程的核心想法就是不使用 mutex,但是实际上无锁编程依然需要用到锁,而且无锁编程也不一定就比有锁编程好。核心实现是使用 atomic 以及其他操作,最重要的就是 CAS (Compare And Swap) 。只有当你的多线程程序是由于 mutex 切换导致的性能问题,而且等待时间非常短的时候才推荐无锁编程。

无锁编程的特点:

  • 给定足够长时间,至少有一个线程会取得进展

  • 无死锁

  • 需要注意 Cache

  • 会有忙等

  • 尽量使用 CAS

CAS 是一个由硬件保证原子性的原语, C++ 提供了两个 CAS: std::atomic_compare_exchange_weak/strong 。两者不同之处在于:

  • string 确保 obj 和 expected 相等的情况下一定会赋值, weak 不一定。但是当它们返回 true 时,一定是相等并赋值

  • 建议使用 while + weak 版本

CAS 会刷新流水线,所以执行 CAS 操作后写入的变量其它线程再次读取时会马上看到

多线程最佳实践

  • 避免等待,尤其是忙等。如果要使用忙等,请配合 yield

  • 避免访问公共资源

  • 永远不要 kill 线程

  • 无锁只读的对象也尽量只使用简单结构,比如尽量不要访问 std::vector ,而是访问由 data() 返回的指针

  • 任何类和方法在没有特别说明的情况下应当默认线程不安全

  • 线程间内存资源多使用只能指针

  • 任务分配尽量均衡

  • 操作的顺序非常重要,关键的代码尽量使用已知的范式

  • 尽量使用标准中的多线程容器

  • 对性能有非常高的地方请考虑使用无锁编程

线程安全的类

对象构造要做到线程安全,唯一的要求是在构造期间不要泄露 this 指针,即

  • 不要在构造函数中注册任何回调

  • 也不要在构造函数中把 this 传给跨线程的对象

  • 即便在构造函数的最后一行也不行

之所以如此规定,是因为在多线程中其他函数可能在对象还没有构造完成时调用构造函数。第三点之所以成立是因为这个类后续可能被继承

按序打印

我们提供了一个类:[11]

public class Foo {
public void first() { print("first"); }
public void second() { print("second"); }
public void third() { print("third"); }
}

三个不同的线程 A、B、C 将会共用一个 Foo 实例。

* 一个将会调用 first() 方法
* 一个将会调用 second() 方法
* 还有一个将会调用 third() 方法

请设计修改程序,以确保 second() 方法在 first() 方法之后被执行,third() 方法在 second() 方法之后被执行。

根据一个题解,性能为:

atomic < condition_variable < posix semaphore < mutex < promise

原子变量:

#include <atomic>
class Foo {
   atomic_bool firstDone = false;
   atomic_bool secondDone = false;
public:
   Foo() {}
   void first(function<void()> printFirst) {
      printFirst();
      firstDone = true;
   }
   void second(function<void()> printSecond) {
      while(!firstDone);
      printSecond();
      secondDone = true;
   }
   void third(function<void()> printThird) {
      while(!secondDone);
      printThird();
   }
};

条件变量:

class Foo {
   bool               firstReady  = false;
   bool               secondReady = false;
   mutex              mtx;
   condition_variable cv1, cv2;

public:
   Foo() { }

   void first(function<void()> printFirst) {
      lock_guard<mutex> l(mtx);
      printFirst();
      firstReady = true;
      cv1.notify_one();
   }

   void second(function<void()> printSecond) {
      unique_lock<mutex> ul(mtx);
      cv1.wait(ul, [&] {
            return this->firstReady;
      });

      printSecond();
      secondReady = true;
      cv2.notify_one();
   }

   void third(function<void()> printThird) {
      unique_lock<mutex> ul(mtx);
      cv2.wait(ul, [&] {
            return this->secondReady;
      });
      printThird();
   }
};

信号量:

#include <semaphore.h>
class Foo {
   sem_t firstJobDone;
   sem_t secondJobDone;

public:
   Foo() {
      sem_init(&firstJobDone, 0, 0);
      sem_init(&secondJobDone, 0, 0);
   }
   void first(function<void()> printFirst) {
      printFirst();
      sem_post(&firstJobDone);
   }
   void second(function<void()> printSecond) {
      sem_wait(&firstJobDone);
      printSecond();
      sem_post(&secondJobDone);
   }
   void third(function<void()> printThird) {
      sem_wait(&secondJobDone);
      printThird();
   }
};

互斥量:

class Foo {
   mutex m2;
   mutex m3;

public:
   Foo() {
      m2.lock();
      m3.lock();
   }
   void first(function<void()> printFirst) {
      printFirst();
      m2.unlock();
   }
   void second(function<void()> printSecond) {
      m2.lock();
      printSecond();
      m3.unlock();
   }
   void third(function<void()> printThird) {
      m3.lock();
      printThird();
   }
};

期望:

class Foo {
   promise<void> f2;
   promise<void> f3;

public:
   Foo() { }
   void first(function<void()> printFirst) {
      printFirst();
      f2.set_value();
   }
   void second(function<void()> printSecond) {
      f2.get_future().wait();
      printSecond();
      f3.set_value();
   }
   void third(function<void()> printThird) {
      f3.get_future().wait();
      printThird();
   }
};

交替打印FooBar

有了上面的铺垫,我们就能应对稍微复杂一些的例子:

我们提供一个类:[12]

class FooBar {
public void foo() {
   for (int i = 0; i < n; i++) {
      print("foo");
   }
}

public void bar() {
   for (int i = 0; i < n; i++) {
      print("bar");
   }
}
}

两个不同的线程将会共用一个 FooBar 实例。其中一个线程将会调用 foo() 方法,另一个线程将会调用 bar() 方法。

请设计修改程序,以确保 "foobar" 被输出 n 次。

class FooBar {
   int                n;
   bool               isOdd = true;
   mutex              mtx;
   condition_variable cond;

public:
   FooBar(int n) {
      this->n = n;
   }

   void foo(function<void()> printFoo) {

      for(int i = 0; i < n; i++) {
            unique_lock<mutex> ul(mtx);
            cond.wait(ul, [&] {
               return isOdd;
            });
            // printFoo() outputs "foo". Do not change or remove this line.
            printFoo();
            isOdd = false;
            cond.notify_one();
      }
   }

   void bar(function<void()> printBar) {

      for(int i = 0; i < n; i++) {
            unique_lock<mutex> ul(mtx);
            cond.wait(ul, [&]() {
               return !isOdd;
            });
            // printBar() outputs "bar". Do not change or remove this line.
            printBar();
            isOdd = true;
            cond.notify_one();
      }
   }
};

还可以用互斥锁:

class FooBar {
   int   n;
   mutex mu1;
   mutex mu2;

public:
   FooBar(int n) {
      this->n = n;
      mu2.lock();
   }

   void foo(function<void()> printFoo) {

      for(int i = 0; i < n; i++) {
            mu1.lock();
            // printFoo() outputs "foo". Do not change or remove this line.
            printFoo();
            mu2.unlock();
      }
   }

   void bar(function<void()> printBar) {

      for(int i = 0; i < n; i++) {
            // printBar() outputs "bar". Do not change or remove this line.
            mu2.lock();
            printBar();
            mu1.unlock();
      }
   }
};

数据结构

线程安全栈

构建一个线程安全的栈:

struct EmptyStack : public std::exception {
   const char* what() const noexcept override {
      return "Stack is empty";
   }
};

template<typename T>
class ThreadSafeStack {
   std::stack<T>      data;
   mutable std::mutex m;

public:
   ThreadSafeStack() = default;
   ThreadSafeStack(ThreadSafeStack const& b) {
      std::lock_guard<std::mutex> lock(b.m);
      data = b.data;
   }
   ThreadSafeStack(ThreadSafeStack&& b) {
      std::lock_guard<std::mutex> lock(b.m);
      data = std::move(b.data);
   }
   ThreadSafeStack& operator=(ThreadSafeStack const&) = default;
   void             push(T value) {
      std::lock_guard<std::mutex> lock(m);
      data.push(value);
   }
   void pop() {
      std::lock_guard<std::mutex> lock(m);
      if(data.empty()) throw EmptyStack();
      data.pop();
   }
   const T& top() {
      std::lock_guard<std::mutex> lock(m);
      if(data.empty()) throw EmptyStack();
      return data.top();
   }
   bool empty() const {
      std::lock_guard<std::mutex> lock(m);
      return data.empty();
   }
};

这里要提到一个问题就是。返回值的入栈是 先于 lock_guard 的析构的。

源代码用了不少的 std::move,但是实际上这些不应该调用

自旋锁

C++ 没有自旋锁,但是我们可以自己做一个:

class SpinLock {
   std::atomic_flag flag = ATOMIC_FLAG_INIT;

public:
   void lock() {
      while(flag.test_and_set(std::memory_order_acquire)) std::this_thread::yield();
   }
   void unlock() {
      flag.clear(std::memory_order_release);
   }
};

这里 std::this_thread::yield() 作用是让出线程(不过即使是调用,依然会占用大量 CPU,这里不清楚为什么)

无锁栈

使用 CAS 操作代替了互斥锁

struct EmptyStack : public std::exception {
   const char* what() const noexcept override {
      return "Stack is empty";
   }
};

template<typename T>
class LockFreeStack {
   struct Node {
      T     data = 0;
      Node* next = nullptr;
      Node()     = default;
      Node(T const& data) : data(data) { }
   };
   std::atomic<Node*> head;

public:
   void push(T const& data) {
      Node* const newNode = new Node(data);
      newNode->next       = head.load();
      // 失败后会自动使用 *this 替代 expected
      while(!head.compare_exchange_weak(newNode->next, newNode)) std::this_thread::yield();
   }
   void pop() {
      auto oldHead = head.load();
      if(!oldHead) throw EmptyStack();
      while(!head.compare_exchange_weak(oldHead, oldHead->next)) std::this_thread::yield();
      delete oldHead;
   }
   T top() {
      auto dataHead = head.load();
      if(!dataHead) throw EmptyStack();
      return dataHead->data;
   }
};

实际上这里的 pop 依然是不安全的,因为你不知道是不是有其它线程持有被删除的节点,在《C++ 并发编程实战(第二版)》中使用风险指针描述了这个特性,意指删除一个线程可能会导致其它线程处于不安全的状态中。原文使用了很多技巧,但是这里使用《Linux 多线程服务端编程》采用的智能指针(因为 handle 总是不会失效):

struct EmptyStack : public std::exception {
   const char* what() const noexcept override {
      return "Stack is empty";
   }
};

template<typename T>
class LockFreeStack {
   struct Node {
      shared_ptr<T> data = 0;
      Node*         next = nullptr;
      Node()             = default;
      Node(T const& data) : data(make_shared<T>(data)) { }
   };
   std::atomic<Node*> head;

public:
   void push(T const& data) {
      Node* const newNode = new Node(data);
      newNode->next       = head.load();
      // 失败后会自动使用 *this 替代 expected
      while(!head.compare_exchange_weak(newNode->next, newNode)) std::this_thread::yield();
   }
   void pop() {
      auto oldHead = head.load();
      if(!oldHead) throw EmptyStack();
      while(!head.compare_exchange_weak(oldHead, oldHead->next)) std::this_thread::yield();
      delete oldHead;
   }
   std::shared_ptr<T> top() {
      auto dataHead = head.load();
      if(!dataHead) throw EmptyStack();
      return dataHead->data;
   }
};

pop 时尽管节点会被删掉,但是数据不会被删掉

Last moify: 2022-12-04 15:11:33
Build time:2025-07-18 09:41:42
Powered By asphinx