C++11多线程编程
# 现代C++:开启多线程编程
# 时间标准库 std::chrono
利用C++强类型特点,将时间区分为:时间点与时间段,明确区分不同的时间单位。
- 时间点类型:
chrono::steady_clock::time_point
等; - 时间段类型:
chrono::milliseconds
,chrono::seconds
,chrono::minutes
等。
auto t0 = chrono::steady_clock::now(); // 获取当前时间点
auto t1 = chrono::seconds(30) + t0; // 获取当前时间后30秒的时间点
auto dt = t1 - t0;
int64_t sec = chrono::duration_cast<chrono::seconds>(dt).count(); // 获取时间差的秒数
# 将时间段设置为double类型
duration_cast
可以在任意的 duration
类型之间转换:duration<T,R>
用类型T表示,且时间单位是R(缺省值为秒),常用的R包括 std::milli
毫秒,std::micro
微秒。即,seconds
是 duration<int64_t>
的类型别名,milliseconds
是 duration<int64_t, std::milli>
的类型别名,可以由此创建double类型的:
using double_ms = std::chrono::duration<double, std::milli>;
# 跨平台的sleep
# 睡一个时间段:std::this_thread::sleep_for
std::this_thread::sleep_for(std::chrono::milliseconds(400));
# 睡到时间点:std::this_thread::sleep_until
auto t = std::chrono::steady_clock::now()+std::chrono::milliseconds(400);
std::this_thread::sleep_until(t);
# 线程
- 进程简单来说是程序的一次执行,打开了一个应用软件就开启了一个进程,每个进程拥有独立的内存空间,开销较大;
- 线程是进程中的一个实体,是CPU可执行调度的最小单位,每个线程共享同样的内存空间,开销较小。
也就是说,进程本身不能获取CPU时间,只有它的线程才可以。
# 现代C++中的多线程:std::thread
# 一个简单的例子
#include <thread>
#include <chrono>
void download(std::string filename)
{
for (int i = 0; i < 10; i++)
{
std::cout << "正在下载" << filename << "...( " << (i + 1) * 10 << "% )\n";
std::this_thread::sleep_for(std::chrono::milliseconds(400));
}
}
int main()
{
using namespace std::literals;
std::thread t1([&]() {
download("file1"s);
});
return 0;
}
std::thread
构造函数的参数可以是任意lambda表达式,当线程启动时,就会执行这个lambda里面的内容。
由于
std::thread
的实现背后是基于pthread的,所以需要链接Threads::ThreadsCMakeLists.txt里链接即可:
find_package(Threads REQUIRED) target_link_libraries(main PUBLIC Threads::Threads)
运行程序会出现报错,这是因为在main中会提前于子线程完成任务,在退出进程后,子线程会因此销毁,进而发生错误。
# 主线程等待子线程结束:t.json()
int main()
{
std::thread t1([&](){
download();
});
std::cout << "Waiting for child thread...\n";
t1.join();
std::cout << "Child thread exited.\n";
return 0;
}
# std::thread
的解构函数会销毁线程
由于std::thread
同样遵循RAII思想和三五法则:自定义了析构函数,并删除了拷贝构造/赋值函数,但是提供了移动构造/赋值函数,因此,当线程t所在函数退出时,会调用相应的析构函数,会销毁t线程,这是我们不想看到的。
# 使用detach()
分离线程
为了让析构函数不再销毁线程,我们可以使用t.detach()
方法分离该线程:
void my_download()
{
std::thread t([&] {
download();
});
t.detach();
}
# 使用全局变量储存线程
然而detach的问题是进程退出的时候,不会等待所有子线程执行完毕,所以另一种解法是将线程对象移动到一个全局变量。
std::vector<std::thread> pool;
void myfunc()
{
std::thread t1([&] {
download("hello.zip");
});
pool.push_back(std::move(t1));
}
int main()
{
myfunc();
std::cout << "Waiting for child thread...\n";
for (auto& thread : pool) thread.join();
std::cout << "Child thread exited.\n";
return 0;
}
# 不够优雅?继续改
在main里面手动join全部线程仍然不够优雅,此时我们可以创建一个类,让其在析构函数中进行join,这样main退出后则会自动调用join。
class ThreadPool
{
std::vector<std::thread> m_pool;
public:
void push_back(std::thread thr)
{
m_pool.push_back(std::move(thr));
}
~ThreadPool()
{
std::cout << "Waiting for child thread...\n";
for (auto& thread : m_pool)
thread.join();
std::cout << "Child thread exited.\n";
}
};
ThreadPool tpool;
void myfunc()
{
std::thread t1([&]
{
download("hello.zip");
});
tpool.push_back(std::move(t1));
}
int main() {
myfunc();
return 0;
}
# C++20: std::jthread
析构时自动调用join()
std::vector<std::jthread> pool;
void myfunc()
{
std::jthread t1([&] {
download("hello.zip");
});
pool.push_back(std::move(t1));
}
int main()
{
myfunc();
return 0;
}
# 异步
# 使用 std::async
写异步
std::async
接受一个带返回值的 lambda,自身返回std::future
对象。lambda的函数体将在另一个线程里执行。
调用future的get()
方法,当任务未完成则会等待完成并获取返回值。
int download();
std::future<int> fret = std::async([&] {
return download();
});
int ret = fret.get();
std::cout << "res = " << ret << '\n';
也可以使用fret.wait()
方法显式地等待,没有返回值。
使用fret.wait_for(/* 时间段 */)
可以指定一个最长等待时间,会返回一个 std::future_status
表示等待是否成功。成则 future_status::ready
,失败则 future_status::timeout
。同理 wait_until()
类似,其参数是一个时间点的区别。
std::future<int> fret3 = std::async([&] {
return download();
});
while (true)
{
std::cout << "Waiting for download complete...\n";
auto status = fret3.wait_for(std::chrono::milliseconds(1000));
if(status == std::future_status::ready)
{
std::cout << "Future is ready~~\n";
break;
}
else
{
std::cout << "Future not ready~~\n";
}
}
std::cout << "res = " << fret3.get() << '\n';
# 使用std::launch::deferred
作为std::async
的参数
直接使用std::async
将会创建一个线程,如果不想创建线程执行,使用std::launch::deferred
作为参数,则会把lambda函数体的执行推迟到future的get()
被调用的时候。
int download();
std::future<int> fret = std::async(std::launch::deferred, [&] {
return download();
});
int ret = fret.get();
此写法只是函数式编程范式意义上的异步,而非真正的多线程异步,可以用这个实现惰性求值
# std::async
的底层实现
如果需要手动创建线程,可以直接用std::promise
,在线程返回时使用set_value()
设置返回值,在主线程里,用get_future
获取其std::future
对象;同样地,get()
方法可以等待并获取线程返回值。
std::promise<int> pret;
std::thread t1([&]
{
auto ret = download();
pret.set_value(ret);
});
std::future<int> fret = pret.get_future();
int res = fret.get();
std::cout << "res = " << res << '\n';
t1.join();
lambda的返回值可以是void,此时set_value()
不接受参数,仅作为同步使用。
# 互斥量
当两个线程试图往一个数组里推数据,可能会出现数据竞争的问题,导致程序崩溃,因为vector不是一个多线程安全的容器。如何才能只让一个线程对数组进行操作?
# std::mutex
上锁
调用std::mutex
的lock()
时,会检测mutex是否已经上锁,没有则上锁,有则等待,直到另一个线程解锁后,才再次上锁。而通过调用unlock()
进行解锁操作。一般一个锁对应一个全局变量。
这样就可以保证mutex.lock()
和mutex.unlock()
之间的代码段,在同一时间只有一个线程在执行。
# std::lock_guard
: 符合RAII思想的上锁与解锁
std::lock_guard
的构造与析构函数中分别会调用mtx.lock()
和mtx.unlock()
,从而避免退出作用域时没有解锁的麻烦。
在std::lock_guard
作用域范围内的操作均会被上锁,如果不想被锁,需要加上{}
限制lock_guard的作用域,如下:
for (int i = 0; i < 10; i++)
{
{
std::lock_guard grd(mtx);
std::cout << "Downloading "
<< " (" << i * 10 << "%)...\n";
}
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
# std::unique_lock
std::lock_guard
严格在析构时进行解锁,但是有时我们希望能够提前解锁,使用std::unique_lock
即可,他会在析构时检查是否解锁并根据情况解锁。
for (int i = 0; i < 30; i++)
{
std::unique_lock qrd(mtx);
std::cout << "Downloading "
<< " (" << i * 10 << "%)...\n";
qrd.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
# 先不上锁,之后手动上锁:std::defer_lock
参数
另外,std::unique_lock
的构造函数还可以有一个额外的参数std::defer_lock
。指定该参数则std::unique_lock
不会在构造函数中上锁,需要手动调用lock()
进行上锁。
for (int i = 0; i < 30; i++)
{
std::unique_lock qrd(mtx, std::defer_lock);
std::cout << "before the lock\n";
qrd.lock();
std::cout << "Downloading "
<< " (" << i * 10 << "%)...\n";
qrd.unlock();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
查看源码可以得到
std::defer_lock_t
是空类,利用空tag类区分不同构造函数的思想在C++中很常见。
# 已经上锁,如何自动解锁:std::adopt_lock
参数
如果当前mutex已经上锁,但是之后仍然希望利用RAII思想进行管理,在析构的时候自动解锁,可以使用std::adopt_lock
参数进行构造std::unique_lock
或std::lock_guard
。
mtx.lock();
std::uniqie_lock grd(mtx, std::adopt_lock);
// do something
# 构造中尝试上锁:std::try_to_lock
参数
在构造函数中调用try_lock()
方法(下面会提到),可以用.owns_lock()
方法判断是否上锁成功。
std::unique_lock grd(mtx, std::try_to_lock);
if (grd.owns_lock())
std::cout << "success!";
else
std::cout << "failed!";
# 如果上锁失败,不要等待:try_lock()
方法
if (mtx.try_lock())
std::cout << "success!\n";
else
std::cout << "failed!\n";
if (mtx.try_lock())
std::cout << "success!\n";
else
std::cout << "failed!\n";
mtx.unlock();
# 如果上锁失败,等一会:try_lock_for()
方法
如需等待一段时间,可用std::timed_mutex
的try_lock_for()
方法。
if (mtx.try_lock(std::chrono::milliseconds(500)))
std::cout << "success!\n";
else
std::cout << "failed!\n";
if (mtx.try_lock(std::chrono::milliseconds(500)))
std::cout << "success!\n";
else
std::cout << "failed!\n";
mtx.unlock();
# 死锁问题
# 不同线程发生的死锁问题
对于以下问题,由于指令并不是同步执行的,因此有可能出现这个情况:
std::thread t1 ([&] {
for (int i = 0; i < 1000; i++){
mtx1.lock();
mtx2.lock();
mtx2.unlock();
mtx1.unlock();
}
});
std::thread t2 ([&] {
for (int i = 0; i < 1000; i++){
mtx2.lock();
mtx1.lock();
mtx1.unlock();
mtx2.unlock();
}
});
t1.join();
t2.join();
- t1执行给mtx1上锁 -> 成功
- t2执行给mtx2上锁 -> 成功
- t1执行给mtx2上锁 -> 失败,等待
- t2执行给mtx1上锁 -> 失败,等待
由于互相锁着对方,因而等待无限制,造成了死锁问题。
# 解决1:永远不要同时持有两把锁
修改内部上锁解锁逻辑:
mtx1.lock();
mtx1.unlock(); // 在持有另一把锁之前现解锁
mtx2.lock();
mtx2.unlock();
# 解决2:保证双方的上锁顺序一致
只需要保证双方上锁的顺序一致,即可避免死锁。
# 解决3:用std::lock
同时对多个上锁
如果没办法保证上锁顺序一致,可以用标准库的std::lock(mtx1, mtx2, ...)
函数,一次性对多个mutex上锁。这个函数保证在无论任意线程中调用的顺序是否相同,都不会产生死锁问题。
std::thread t1 ([&] {
for (int i = 0; i < 1000; i++){
std::lock(mtx1, mtx2);
mtx2.unlock();
mtx1.unlock();
}
});
std::thread t2 ([&] {
for (int i = 0; i < 1000; i++){
std::lock(mtx2, mtx1);
mtx1.unlock();
mtx2.unlock();
}
});
t1.join();
t2.join();
# 解决3-PLUS:使用RAII版本的std::lock
:std::scoped_lock
std::thread t1 ([&] {
for (int i = 0; i < 1000; i++){
std::scoped_lock grd(mtx1, mtx2);
// do something
}
});
std::thread t2 ([&] {
for (int i = 0; i < 1000; i++){
std::scoped_lock grd(mtx2, mtx1);
// do something
}
});
t1.join();
t2.join();
# 同一线程发生的死锁问题
void other() {
mtx1.lock();
mtx1.unlock();
}
void func() {
mtx1.lock();
other();
mtx1.unlock();
}
int main() {
func();
}
func函数中上锁后调用other继续上锁则会陷入死锁中。
# 解决1:other里不要上锁
如题
# 解决2:改用std::recursive_mutex
使用std::recursive_mutex
而非std::mutex
。
前者会自动判断是不是同一个线程上锁多次,是则让计数器加一,解锁计数器会减一,直到0才会真正解锁,当然会产生一定的性能损失。
同理还有
std::recursive_timed_mutex
,如果同时需要try_lock_for
的话。
# 多线程中的数据结构
利用锁实现线程安全的数据结构,举例实现线程安全的vector。
class MTVector
{
public:
std::vector<int> m_vec;
mutable std::mutex m_mtx;
void push_back(int v)
{
m_mtx.lock();
m_vec.push_back(v);
m_mtx.unlock();
}
size_t size() const
{
m_mtx.lock();
size_t ret = m_vec.size();
m_mtx.unlock();
return ret;
}
};
在const函数中其实无法对std::mutex
进行上锁与解锁操作,为了支持这类逻辑上const的函数,可以使用mutable
关键字对mtx进行修饰,从而所有成员里只有他不是const的。
# 读写锁
这是一种提高性能的方式,对于同一个数据,可以多个人一起读,但是对于写,只允许一个人来操作。即,读可以共享,写必须独占,且写和读不能共存。
# 读写锁:std::shared_mutex
标准库提供的std::shared_mutex
,可以对上述代码修改如下:
size_t size() const
{
m_mtx.lock_shared();
size_t ret = m_vec.size();
m_mtx.unlock_shared();
return ret;
}
因为push_back()
是对数据的写操作,仍使用lock/unlock
方法组合,
而对于size()
是对数据的读操作,不修改数据,因此可以使用lock_shared/unlock_shared
组合。
# std::shared_lock
:符合RAII思想的lock_shared()
正如std::unique_lock
之于lock()
,std::shared_lock
之于lock_shared()
,这保证自动调用unlock_shared()
,更加安全。
shared_lock
同样支持defer_lock
,owns_lock()
等。
# 访问者模式
访问者模式主要将数据结构与数据操作分离,利用访问者模式,可以只需要上一次锁即可,而且复合RAII思想。
class MTVector
{
public:
std::vector<int> m_vec;
std::mutex m_mtx;
class Accessor
{
public:
MTVector& m_that;
std::unique_lock<std::mutex> m_guard;
Accessor(MTVector& that) :m_that(that), m_guard(that.m_mtx) {}
void push_back(int v) const {
return m_that.m_vec.push_back(v);
}
size_t size() const {
return m_that.m_vec.size();
}
};
Accessor access() {return { *this };}
};
# 条件变量
# 等待被唤醒
只有当某个事件发生后,线程才可以继续执行。std::condition_variable
的 notify_one()
用于唤醒一个线程; notify_all()
则是通知所有线程。下面是一个条件变量的举例 :
std::condition_variable cv;
std::mutex mtx;
std::jthread t1([&] {
std::unique_lock lck(mtx);
cv.wait(lck);
std::cout << "t1 is awake.\n";
});
std::cout << "Tasks begin.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "notifying...\n";
cv.notify_one();
# 等待某一条件成真
wait
可以额外指定一个参数:wait(lck, expr)
。expr是个lambda表达式,只有返回值为true时才会真正被唤醒。
std::condition_variable cv;
std::mutex mtx;
bool ready = false;
std::jthread t1([&] {
std::unique_lock lck(mtx);
cv.wait(lck, [&] {return ready; });
std::cout << "t1 is awake.\n";
});
std::cout << "Tasks begin.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "notifying not ready\n";
cv.notify_one();
ready = true;
std::cout << "notifying ready\n";
cv.notify_one();
# 条件变量应用
# 实现生产-消费者模式
std::condition_variable food_cv;
std::mutex food_mtx;
std::vector<int> foods;
std::thread c1([&] {
for (int i = 0; i < 2; i++) {
std::unique_lock lck1(food_mtx);
food_cv.wait(lck1, [&] {return foods.size() != 0; });
std::cout << "C1 got food: " << foods.back() << '\n';
foods.pop_back();
}
});
std::thread c2([&] {
for (int i = 0; i < 2; i++) {
std::unique_lock lck1(food_mtx);
food_cv.wait(lck1, [&] {return foods.size() != 0; });
std::cout << "C2 got food: " << foods.back() << '\n';
foods.pop_back();
}
});
foods.push_back(12);
food_cv.notify_one();
foods.push_back(13);
food_cv.notify_one();
foods.push_back(14);
foods.push_back(15);
food_cv.notify_all();
c1.join();
c2.join();
# 将foods队列封装为一个类
template<typename T>
class MTQueue
{
std::condition_variable m_cv;
std::mutex m_mtx;
std::queue<T> m_queue;
public:
T pop() {
std::unique_lock lck(m_mtx);
m_cv.wait(lck, [this] {return !this->m_queue.empty(); });
T ret = std::move(m_queue.front());
m_queue.pop();
return ret;
}
void push(T val) {
std::unique_lock lck(m_mtx);
m_queue.push(std::move(val));
m_cv.notify_one();
}
void push_many(std::initializer_list<T> vals) {
std::unique_lock lck(m_mtx);
for (auto && val : vals)
{
m_queue.push(std::move(val));
}
m_cv.notify_all();
}
};
std::condition_variable
仅仅支持std::unique_lock<std::mutex>
作为wait
的参数,如果需要用其他类型的mutex锁,可以用std::condition_variable_any
。
# 原子操作
一个经典案例:
int counter = 0;
std::thread t1([&] {
for (int i = 0; i < 10000; ++i) {
counter += 1;
}
});
std::thread t2([&] {
for (int i = 0; i < 10000; ++i) {
counter += 1;
}
});
t1.join();
t2.join();
std::cout << counter << '\n';
经典暴力解决方法是:用mutex上锁。
这样做的问题是,太过重量级,通过操作系统让线程被挂起,进入内核层,有很大的开销,严重影响了执行效率。因此,使用更轻量级的std::atomic
,对他的+=
等操作,编译器会转换为专门的指令,CPU识别到该指令,会保证该操作时原子的。
因此对于上述代码,只需要把int counter = 0;
改为std::atomic<int> counter = 0;
即可。
注意:
counter = counter + 1;
不能保证原子性,需要使用+=,++
类似的操作符。
# fetch_add
: 与 +=
等价
除了使用运算重载付之外,还可以直接调用相应的函数名,例如:
fetch_add
:+=
;store
:=
;load
: 读取其中的值
另外,fetch_add
还可以返回其旧值:int old = atm.fetch_add(value);
,这个特点可以用于并行地往一个列表里追加数据:追加写入的索引值就是返回的旧值。
# exchange
:读取的同时写入
exchange(value)
会把value写入原子变量的同时,返回其旧的值。
std::atomic<int> counter;
counter.store(0);
int old = counter.exchange(3);
std::cout << "old = " << old << '\n'; // 0
int now = counter.load();
std::cout << "now = " << now << '\n'; // 3
# compare_exchange_strong
读取,比较是否相等,相等则写入
compare_exchange_strong(old, value)
会读取原子变量的值,比较他是否和old相等:
- 相等,则把value写入原子变量
- 不等,则把原子变量的值写入old;
- 返回bool值表示是否相等