内核中的线程
在 Linux 中,无论是创建进程还是进程,最终调用的函数都是 clone:
int clone(int (*fn)(void *), void *stack, int flags, void *arg, ...
/* pid_t *parent_tid, void *tls, pid_t *child_tid */ );
创建线程和进程的区别为:
当创建线程时,调用的 flag 为 CLONE_VM | CLONE_FS | CLONE_FILES | CLONE_SIGHAND
子线程的 TGID(thread group id) 被设置为主进程的 PID
子线程依然会被分配一个 PID |
简而言之,线程层次可简单划分为:[1]
因而 内核区分线程的唯一方式就是查看进程的 PID 和 TGID 是否相同。
Linux 中。getpid 和 gettid 的实现如下:[2]
/**
* sys_getpid - 返回当前进程的 TGID
*
* 注意:尽管名字如此。但它返回的是 tgid 而不是 pid。除非在 clone() 中指定了
* CLONE_THREAD,否则 tgid 和 pid 是相同的。否则 tgid 在同一组的线程中都是
* 相同的。
*
* 这是 SMP 安全的,因为 current->tgid 没有改变
*/
SYSCALL_DEFINE0(getpid) {
return task_tgid_vnr(current);
}
/* 线程 ID - 内核内部的 "pid" */
SYSCALL_DEFINE0(gettid) {
return task_pid_vnr(current);
}
proc 中的线程
/proc 目录中会列出所有的进程,对目录进行遍历只能得到进程的结果。然而,依然可以通过线程的 pid 来获取线程的信息。这是因为 /proc 是一个虚拟的文件系统。具体解释由内核实现。
ps 中的线程
ps 中列出线程的方式为:
$ ps -eLf | egrep -i '(LWP)|(dsmaster)' UID PID PPID LWP C NLWP STIME TTY TIME CMD tea 9543 8712 9543 0 21 09:31 pts/0 00:00:00 ./dsmaster --terminal -l err tea 9543 8712 9544 0 21 09:31 pts/0 00:00:00 ./dsmaster --terminal -l err tea 9543 8712 9545 0 21 09:31 pts/0 00:00:00 ./dsmaster --terminal -l err
也可以使用 pstree 列出线程:
$ pstree -p 9543 dsmaster(9543)─┬─{dsmaster}(9544) └─{dsmaster}(9545)
线程标示
与进程 ID 不同的是,线程 ID 只在其所属的进程上下文中才有意义。另外线程 ID 的类型是 pthread_t,一般实现为结构体,而不是类似于 pid_t 那样的数字。线程 ID 的比较需要使用指定的函数:
函数 | 用途 |
pthread_self() | 获取线程的 ID |
pthread_euqal(tid1, tid2) | 比较线程 ID |
创建线程
创建线程需要的函数为:
int pthread_create(pthread_t* tid, const pthread_attr_t* attr, void(*fun)(void*), void* arg)
四个参数为:
参数 | 方向 | 作用 |
tid | 输出 | 用来获取新建线程的 tid,可为 null |
attr | 输入 | 用来设置线程的属性 |
fun | 输入 | 线程将要运行的函数 |
arg | 输入 | fun 的参数 |
从第四个参数可以看出,fun 的参数只能有一个。在 C++ 中,更多的参数可以使用 bind 来绑定
新线程会继承进程的进程空间
新线程继承父线程的浮点环境和信号屏蔽字
新线程不会继承父线程的挂起信号集
进程中线程之间的关系是平等的,相互可以发信号和通知关闭
进程中的信号处理函数是公用的
int main(int argc, char* argv[]) {
pthread_t parId = pthread_self();
pthread_t chID;
pthread_create(
&chID, nullptr,
[](void* arg) -> void* {
pthread_t id = pthread_self();
cout << (pthread_equal(id, *(pthread_t*)arg) == 0);
return nullptr;
},
&parId
);
pthread_join(chID, nullptr);
return 0;
}
从上面可以看到 C++ 中的 Lambda 与 pthread 配合的例子。
另外,和 pthread_join 相对应的函数是 pthread_detach
在上述例子中,尽管我们可以将 chID 传递给子线程,但是子线程不能使用它。原因是子线程的运行可能早于 chID 初始化的时机 |
线程在执行以下语句后会被终止:
调用 return 语句
调用 pthread_exit
被其它线程取消
函数 | 作用 |
noreturn void pthread_exit(void *retval) | 退出线程 |
int pthread_cancel(pthread_t thread) | 取消线程 |
取消线程只是一个请求,线程可以通过以下函数自由安排取消线程时执行的函数:
函数 | 作用 |
void pthread_cleanup_push(void(fun)(void), void* arg) | 添加清理程序 |
void pthread_cleanup_pop(int execute) | 删除处理程序 |
某些平台上,这两个函数的实现为宏。另外,这两个函数 必须 配套使用
互斥锁
唯一需要注意的是互斥锁请求锁的动作是进入阻塞状态而不是忙等。也就是说互斥锁不会无意义地浪费 CPU
互斥锁有两种:普通锁和超时锁,两者都使用 pthread_mutex_t,虽然加锁的函数不同。但无论是哪一种,都需要对互斥锁进行初始化
初始化的方式有两种:
调用 pthread_mutex_init
使用常量 PTHREAD_MUTEX_INITIALIZER 初始化
int pthread_mutex_init(pthread_mutex_t* mtx, const pthread_mutexattr_t* attr);
int pthread_mutex_lock(pthread_mutex_t* mtx);
int pthread_mutex_timedlock(pthread_mutex_t* mtx, const timespec* time);
int pthread_mutex_try_lock(pthread_mutex_t* mtx);
int pthread_mutex_unlock(pthread_mutex_t* mtx);
pthread_try_lock 失败的返回值为 EBUSY,成功的返回值为 0
pthread_mutex_timedlock 超时的返回值为 ETIMEOUT,其参数 time 是 绝对时间 而不是相对时间
互斥锁的属性使用 pthread_mutexattr_t 表示,使用时需要进行初始化,结束时需要进行析构
int pthread_mutexattr_init(pthread_mutexattr_t* attr);
int pthread_mutexattr_destory(pthread_mutexattr_t* attr);
pthread_mutexattr_init 会用默认的属性对 attr 进行初始化。要设置其它属性,需要使用额外的函数,这些属性中比较重要的有三个:进程共享属性、健壮属性、类型属性
进程共享属性用来设置锁是否能被其它进程更改,其有两个值:
值 | 作用 |
PTHREAD_PROCESS_PRIVATE | 锁只能被此进程内的线程更改 |
PTHREAD_PROCESS_SHARED | 锁可用于进程间同步 |
这个属性由以下函数设置:
int pthread_mutexattr_getpshared(const pthread_mutexattr_t* attr, int* pshared);
int pthread_mutexattr_setpshared(pthread_mutexattr_t* attr, int pshared);
健壮属性用于进程间互斥锁,其控制了持有锁的进程结束时锁恢复的问题
简单来说,当持有锁的进程终止而其有没释放锁时,系统有两种行为:
值 | 作用 |
PTHREAD_MUTEX_STALLED | 系统不采取任何动作,行为未定义 |
PTHREAD_MUTEX_ROBUST | 其它进程会成功调用 pthread_mutex_lock 并得到的返回值 EOWNERDEAD |
int pthread_mutexattr_getrobust(const pthread_mutexattr_t* attr, int* pshared);
int pthread_mutexattr_setrobust(pthread_mutexattr_t* attr, int pshared);
也就是说,当我们使用了 PTHREAD_MUTEX_SHARED 后调用 pthread_mutex_lock 会陷入三种状态之一:
失败
成功且不需要恢复的锁
成功但需要恢复的锁
对于第三种情况,需要调用函数 pthread_mutex_consistent 来恢复锁的状态
int pthread_mutex_consistent(pthread_mutex_t* mtx);
如果锁在需要恢复的时候没有恢复而是直接解锁,那么其它尝试获取锁的进程就会得到错误码 ENOTRECOVERABKE,这种情况一旦发生,互斥锁就不再可用
锁的类型属性控制了锁的锁定特性,有四种类型:
值 | 作用 |
PTHREAD_MUTEX_NORMAL | 标准互斥锁,不提供错误检查和死锁检测 |
PTHREAD_MUTEX_ERRORCHECK | 提供错误检查的互斥锁 |
PTHREAD_MUTEX_RECURSIVE | 可被有锁的线程加锁 |
PTHREAD_MUTEX_DEFAULT | 被映射为上述三种锁之一 |
对于 Linux 而言,PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_NORMAL,而对于 FreeBSD 而言,PTHREAD_MUTEX_DEFAULT = PTHREAD_MUTEX_ERRORCHECK |
设置此属性需要以下函数:
int pthread_mutexattr_gettype(const pthread_mutexattr_t* attr, int* type);
int pthread_mutexattr_settype(pthread_mutexattr_t* attr, int* type);
读写锁
读写锁在其它场景下又被称为 共享锁 (Shared Lock)
和 排它锁 (eXclusive Lock)
,是一种 读友好 的锁。其具有以下特性:
加读锁时不允许其它线程写,但是允许其它线程读
加写锁时不允许其它线程读写
读写锁的构造函数和析构函数为:
int pthread_rwlock_init(pthread_rwlock_t* rwlock, const pthread_rwlockattr_t* attr);
int pthread_rwlock_destory(pthread_rwlock_t* rwlock);
读写锁也可以通过静态变量 PTHREAD_RWLOCK_INITIALIZER 初始化
读写锁必须要执行析构函数 |
对读写锁加读锁和写锁并解锁的接口为:
int pthread_rwlock_rdlock(pthread_rwlock_t* rwlock); // 加读锁
int pthread_rwlock_wrlock(pthread_rwlock_t* rwlock); // 加写锁
int pthread_rwlock_unlock(pthread_rwlock_t* rwlock); // 解锁
读写锁也提供了 try 版本:
int pthread_rwlock_tryrdlock(pthread_rwlock_t* rwlock); // 加读锁
int pthread_rwlock_trywrlock(pthread_rwlock_t* rwlock); // 加写锁
加锁成功时返回 0,失败时返回 EBUSY
也提供了具有超时功能的加锁功能:
int pthread_rwlock_timedrdlock(pthread_rwlock_t* rwlock); // 加读锁
int pthread_rwlock_timedwrlock(pthread_rwlock_t* rwlock); // 加写锁
读写锁唯一支持的属性就是 进程共享 属性,此功能与互斥锁的进程属性是一样的
条件变量
与互斥锁不同的是,条件变量将所有需要资源的线程放到一个队列中,并使其陷入阻塞中,当有资源可用时,可以通过唤醒一个或所有线程来消费资源。
条件变量是无锁的,但是由于条件本身不是原子的,因此需要锁来对其操作进行保护。
条件变量的使用必须进行构造和析构:
int pthread_cond_init(pthread_cond_t* cond, pthread_condattr_t attr);
int pthread_cond_destory(pthread_cond_t* cond);
使用条件变量获取资源:
int pthread_cond_wait(pthread_cond_t* cond, pthread_mutex_t* mtx);
int pthread_cond_timedwait(pthread_cond_t* cond, pthread_mutex_t* mtx, const timespec* time);
如果超时还没得到条件变量,则返回 ETIMEOUT
使用条件变量释放资源:
int pthread_cond_signal(pthread_cond_t* cond); // 至少唤醒一个线程
int pthread_cond_broadcast(pthread_cond_t* cond); // 唤醒所有进程
条件变量较关键的属性为:进程共享和时钟属性
时钟属性控制了 pthread_cond_timedwait 采用哪个时钟
int pthread_condattr_getclock(const pthread_condattr_t* attr, clockid_t clock_id);
允许的时钟 ID 为:
标识符 | 功能 |
CLOCK_REALTIME | 实时系统时间 |
CLOCK_MONOTONIC | 不带负跳数的实时系统时间 |
CLOCK_PROCESS_CPUTIME_ID | 调用进程的 CPU 时间 |
CLOCK_THREAD_CPUTIME_ID | 调用线程的 CPU 时间 |
自旋锁
和互斥锁在无法获取锁时进入阻塞状态不同,自旋锁使用忙等的形式不断尝试获取锁。因此自旋锁只适用于锁被持有的时间很短的情况下
实际上现在互斥锁的实现非常高效,一些互斥锁在尝试获取互斥量之前或自旋一段时间。 |
自旋锁在使用前和使用后需要进行初始化和销毁:
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);
自旋锁唯一的属性就是进程间共享属性,当 pshared == PTHREAD_PROCESS_PRIVATE 时,表明锁是进程间私有的,而当 pshared == PTHREAD_PROCESS_SHARED 时,表明锁可以被不属于当前进程的线程获取
其和互斥锁的接口类似:
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);
不要在持有自旋锁的情况下调用可能使进程陷入休眠状态的函数 |
在未初始化的自旋锁上调用 pthread_spin_lock 的行为是未定义的,在未加锁的自旋锁上调用 ptherad_spin_unlock 也是未定义的。
实际上我们可以使用互斥锁自己实现一个自旋锁:
void spin_lock(pthread_mutex_t* mtx){
while(pthread_mutex_try_lock(mtx) == EBUSY){
yield();
}
}
其中 yield 的作用是让出 CPU 。这样,spin_lock 会在无法获取锁的时候不断循环,直至得到锁
屏障
屏障用于将多个进程阻塞至某个点。pthread_join 就是一种屏障,它允许一个线程等待另一个线程退出
屏障必须进行初始化和销毁:
int pthread_barrier_init(pthread_barrier_t *restrict barrier, const pthread_barrierattr_t *restrict attr, unsigned count);
int pthread_barrier_destroy(pthread_barrier_t *barrier);
int pthread_barrier_wait(pthread_barrier_t *barrier);
count 参数表明了多少线程调用 pthread_barrier_wait 时线程才会被唤醒。
对于任意线程,只有一个线程 pthread_barrier_wait 的返回值为 PTHREAD_BARRIER_SERIAL_THREAD,其余线程均返回为 0,因此一个线程可以被视为主线程,其余线程被视为辅助线程
和锁相同,屏障也可以被重用
事务内存
GCC 对事务内存的支持被存放在库 libitm 中,请参阅 GNU libitm
线程池
线程池是池化技术的一种,所谓池化技术,就是:
以空间换取时间
提高资源的利用率
减少每次获取资源的消耗
实现对象的复用
当线程的创建销毁时间明显影响影响了系统的效率时(每个线程计算量很小),应当考虑使用线程池。另一方面,线程池会自动管理线程的数量,不会因为大量线程的创建导致系统效率低下
线程池包含三部分组件:
执行队列(线程)
任务队列(task)
管理组件
使用 C++ 可以方便地创建一个线程池:
#include <condition_variable>
#include <functional>
#include <mutex>
#include <queue>
#include <thread>
class ThreadPool {
std::condition_variable cond_;
std::mutex mutex_;
bool terminate_ = false;
std::queue<std::thread> works_;
std::queue<std::function<void(void)>> jobs_;
ThreadPool() {
unsigned numWorksers = std::thread::hardware_concurrency();
for(size_t i = 0; i < numWorksers; ++i) {
auto t = std::thread(&ThreadPool::callback, this);
// thread 只有移动语义
works_.push(std::move(t));
}
}
protected:
void callback() {
while(true) {
// unique_lock 会自动加锁
std::unique_lock<std::mutex> lock(mutex_);
cond_.wait(lock, [this]() {
return !jobs_.empty() || this->terminate_; // 当 jobs_ 不为空或者 terminate_ == true 时跳出等待
});
if(this->terminate_) break;
auto job = jobs_.front();
jobs_.pop();
lock.unlock();
job();
}
}
public:
static ThreadPool& getInstance() {
static ThreadPool pool;
return pool;
}
~ThreadPool() {
this->terminate_ = true;
mutex_.lock();
cond_.notify_all();
mutex_.unlock();
while(!this->works_.empty()) {
std::thread t = std::move(works_.front());
works_.pop();
t.join();
}
}
void push(std::function<void(void)> func) {
mutex_.lock();
jobs_.push(func);
cond_.notify_one();
mutex_.unlock();
}
};
对于线程池中线程数量的选择,应遵循以下原则: skyformat99/thread_pool
计算密集型任务:线程个数 = CPU个数
I/O密集型任务:线程个数 > CPU个数
How can I detect that a thread pool work item is taking too long?