Sirius' blog Sirius' blog
首页
  • 学习笔记

    • 《C++》
    • 《MATLAB》
    • 《Python》
  • 学习笔记

    • 《Git》
    • 《CMake》
  • 技术文档
  • 博客搭建
  • 学习
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)

Sirius0v0

怕什么真理无穷,进一寸有一寸的欢喜
首页
  • 学习笔记

    • 《C++》
    • 《MATLAB》
    • 《Python》
  • 学习笔记

    • 《Git》
    • 《CMake》
  • 技术文档
  • 博客搭建
  • 学习
  • 友情链接
关于
收藏
  • 分类
  • 标签
  • 归档
GitHub (opens new window)
  • 基础

  • Cpp小记
  • C++学习笔记-220705-V1
  • 进阶

    • CC++模板
    • CC++STL-常用容器
    • CC++STL-函数对象
    • CC++STL-常用算法
    • 现代C++:常量、变量与类型推导
    • 现代C++:函数式编程(lambda、函数对象包装器)
    • 现代C++:右值引用、移动语义与完美转发
    • 现代C++:智能指针
    • C++11多线程编程
      • 时间标准库 std::chrono
        • 将时间段设置为double类型
      • 跨平台的sleep
        • 睡一个时间段:std::this_thread::sleep_for
        • 睡到时间点:std::this_thread::sleep_until
      • 线程
      • 现代C++中的多线程:std::thread
        • 一个简单的例子
        • 主线程等待子线程结束:t.json()
        • std::thread的解构函数会销毁线程
      • 异步
        • 使用 std::async 写异步
        • 使用std::launch::deferred作为std::async的参数
        • std::async的底层实现
      • 互斥量
        • std::mutex上锁
        • std::lock_guard: 符合RAII思想的上锁与解锁
        • std::unique_lock
        • 如果上锁失败,不要等待:try_lock()方法
        • 如果上锁失败,等一会:try_lock_for()方法
      • 死锁问题
        • 不同线程发生的死锁问题
        • 同一线程发生的死锁问题
      • 多线程中的数据结构
        • 读写锁
        • 访问者模式
      • 条件变量
        • 等待被唤醒
        • 等待某一条件成真
        • 条件变量应用
      • 原子操作
        • fetch_add: 与 += 等价
        • exchange:读取的同时写入
        • compare_exchange_strong读取,比较是否相等,相等则写入
  • 奇思妙用

  • 库的使用

  • 《C++》学习笔记
  • 进阶
Sirius0v0
2023-08-16
目录

C++11多线程编程

# 现代C++:开启多线程编程

# 时间标准库 std::chrono

利用C++强类型特点,将时间区分为:时间点与时间段,明确区分不同的时间单位。

  • 时间点类型:chrono::steady_clock::time_point等;
  • 时间段类型:chrono::milliseconds,chrono::seconds,chrono::minutes等。
auto t0 = chrono::steady_clock::now();  // 获取当前时间点
auto t1 = chrono::seconds(30) + t0; // 获取当前时间后30秒的时间点
auto dt = t1 - t0;
int64_t sec = chrono::duration_cast<chrono::seconds>(dt).count();   // 获取时间差的秒数

# 将时间段设置为double类型

duration_cast 可以在任意的 duration 类型之间转换:duration<T,R> 用类型T表示,且时间单位是R(缺省值为秒),常用的R包括 std::milli毫秒,std::micro微秒。即,seconds 是 duration<int64_t> 的类型别名,milliseconds 是 duration<int64_t, std::milli> 的类型别名,可以由此创建double类型的:

using double_ms = std::chrono::duration<double, std::milli>;

# 跨平台的sleep

# 睡一个时间段:std::this_thread::sleep_for

std::this_thread::sleep_for(std::chrono::milliseconds(400));

# 睡到时间点:std::this_thread::sleep_until

auto t = std::chrono::steady_clock::now()+std::chrono::milliseconds(400);
std::this_thread::sleep_until(t);

# 线程

  • 进程简单来说是程序的一次执行,打开了一个应用软件就开启了一个进程,每个进程拥有独立的内存空间,开销较大;
  • 线程是进程中的一个实体,是CPU可执行调度的最小单位,每个线程共享同样的内存空间,开销较小。

也就是说,进程本身不能获取CPU时间,只有它的线程才可以。

# 现代C++中的多线程:std::thread

# 一个简单的例子

#include <thread>
#include <chrono>

void download(std::string filename)
{
    for (int i = 0; i < 10; i++)
    {
        std::cout << "正在下载" << filename << "...( " << (i + 1) * 10 << "% )\n";
        std::this_thread::sleep_for(std::chrono::milliseconds(400));
    }
}

int main()
{
    using namespace std::literals;
    std::thread t1([&]() {
        download("file1"s);
        });
    return 0;
}

std::thread 构造函数的参数可以是任意lambda表达式,当线程启动时,就会执行这个lambda里面的内容。

由于std::thread的实现背后是基于pthread的,所以需要链接Threads::Threads

CMakeLists.txt里链接即可:

find_package(Threads REQUIRED)
target_link_libraries(main PUBLIC Threads::Threads)

运行程序会出现报错,这是因为在main中会提前于子线程完成任务,在退出进程后,子线程会因此销毁,进而发生错误。

# 主线程等待子线程结束:t.json()

int main()
{
    std::thread t1([&](){
        download();
    });

    std::cout << "Waiting for child thread...\n";
    t1.join();
    std::cout << "Child thread exited.\n";
    return 0;
}

# std::thread的解构函数会销毁线程

由于std::thread同样遵循RAII思想和三五法则:自定义了析构函数,并删除了拷贝构造/赋值函数,但是提供了移动构造/赋值函数,因此,当线程t所在函数退出时,会调用相应的析构函数,会销毁t线程,这是我们不想看到的。

# 使用detach()分离线程

为了让析构函数不再销毁线程,我们可以使用t.detach()方法分离该线程:

void my_download()
{
    std::thread t([&] {
        download();
    });
    t.detach();
}

# 使用全局变量储存线程

然而detach的问题是进程退出的时候,不会等待所有子线程执行完毕,所以另一种解法是将线程对象移动到一个全局变量。

std::vector<std::thread> pool;

void myfunc()
{
    std::thread t1([&] {
        download("hello.zip");
    });
    pool.push_back(std::move(t1));
}

int main() 
{
    myfunc();
    std::cout << "Waiting for child thread...\n";
    for (auto& thread : pool) thread.join();
    std::cout << "Child thread exited.\n";
    return 0;
}

# 不够优雅?继续改

在main里面手动join全部线程仍然不够优雅,此时我们可以创建一个类,让其在析构函数中进行join,这样main退出后则会自动调用join。

class ThreadPool
{
    std::vector<std::thread> m_pool;

public:
    void push_back(std::thread thr)
    {
        m_pool.push_back(std::move(thr));
    }

    ~ThreadPool()
    {
        std::cout << "Waiting for child thread...\n";
        for (auto& thread : m_pool)
            thread.join();
        std::cout << "Child thread exited.\n";
    }
};

ThreadPool tpool;

void myfunc()
{
    std::thread t1([&]
        {
            download("hello.zip");
        });
    tpool.push_back(std::move(t1));
}

int main() {
    myfunc();
    return 0;
}

# C++20: std::jthread析构时自动调用join()

std::vector<std::jthread> pool;

void myfunc()
{
    std::jthread t1([&] {
        download("hello.zip");
    });
    pool.push_back(std::move(t1));
}

int main() 
{
    myfunc();
    return 0;
}

# 异步

# 使用 std::async 写异步

std::async 接受一个带返回值的 lambda,自身返回std::future对象。lambda的函数体将在另一个线程里执行。

调用future的get()方法,当任务未完成则会等待完成并获取返回值。

int download();

std::future<int> fret = std::async([&] {
    return download();
});
    
int ret = fret.get();
std::cout << "res = " << ret << '\n';

也可以使用fret.wait()方法显式地等待,没有返回值。

使用fret.wait_for(/* 时间段 */) 可以指定一个最长等待时间,会返回一个 std::future_status 表示等待是否成功。成则 future_status::ready,失败则 future_status::timeout。同理 wait_until() 类似,其参数是一个时间点的区别。

std::future<int> fret3 = std::async([&] {
    return download();
});

while (true)
{
    std::cout << "Waiting for download complete...\n";
    auto status = fret3.wait_for(std::chrono::milliseconds(1000));
    if(status == std::future_status::ready)
    {
        std::cout << "Future is ready~~\n";
        break;
    }
    else
    {
        std::cout << "Future not ready~~\n";
    }
}
std::cout << "res = " << fret3.get() << '\n';

# 使用std::launch::deferred作为std::async的参数

直接使用std::async将会创建一个线程,如果不想创建线程执行,使用std::launch::deferred作为参数,则会把lambda函数体的执行推迟到future的get()被调用的时候。

int download();

std::future<int> fret = std::async(std::launch::deferred, [&] {
    return download();
});
    
int ret = fret.get();

此写法只是函数式编程范式意义上的异步,而非真正的多线程异步,可以用这个实现惰性求值

# std::async的底层实现

如果需要手动创建线程,可以直接用std::promise,在线程返回时使用set_value()设置返回值,在主线程里,用get_future获取其std::future对象;同样地,get()方法可以等待并获取线程返回值。

std::promise<int> pret;
std::thread t1([&]
    {
        auto ret = download();
        pret.set_value(ret);
    });
std::future<int> fret = pret.get_future();

int res = fret.get();
std::cout << "res = " << res << '\n';

t1.join();

lambda的返回值可以是void,此时set_value()不接受参数,仅作为同步使用。

# 互斥量

当两个线程试图往一个数组里推数据,可能会出现数据竞争的问题,导致程序崩溃,因为vector不是一个多线程安全的容器。如何才能只让一个线程对数组进行操作?

# std::mutex上锁

调用std::mutex的lock()时,会检测mutex是否已经上锁,没有则上锁,有则等待,直到另一个线程解锁后,才再次上锁。而通过调用unlock()进行解锁操作。一般一个锁对应一个全局变量。

这样就可以保证mutex.lock()和mutex.unlock()之间的代码段,在同一时间只有一个线程在执行。

# std::lock_guard: 符合RAII思想的上锁与解锁

std::lock_guard的构造与析构函数中分别会调用mtx.lock()和mtx.unlock(),从而避免退出作用域时没有解锁的麻烦。

在std::lock_guard作用域范围内的操作均会被上锁,如果不想被锁,需要加上{}限制lock_guard的作用域,如下:

for (int i = 0; i < 10; i++)
{
    {
        std::lock_guard grd(mtx);
        std::cout << "Downloading "
            << " (" << i * 10 << "%)...\n";
    }
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

# std::unique_lock

std::lock_guard严格在析构时进行解锁,但是有时我们希望能够提前解锁,使用std::unique_lock即可,他会在析构时检查是否解锁并根据情况解锁。

for (int i = 0; i < 30; i++)
{
    std::unique_lock qrd(mtx);
    std::cout << "Downloading "
        << " (" << i * 10 << "%)...\n";
    qrd.unlock();
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

# 先不上锁,之后手动上锁:std::defer_lock参数

另外,std::unique_lock的构造函数还可以有一个额外的参数std::defer_lock。指定该参数则std::unique_lock不会在构造函数中上锁,需要手动调用lock()进行上锁。

for (int i = 0; i < 30; i++)
{
    std::unique_lock qrd(mtx, std::defer_lock);
    std::cout << "before the lock\n";
    qrd.lock();
    std::cout << "Downloading "
        << " (" << i * 10 << "%)...\n";
    qrd.unlock();
    std::this_thread::sleep_for(std::chrono::milliseconds(100));
}

查看源码可以得到std::defer_lock_t是空类,利用空tag类区分不同构造函数的思想在C++中很常见。

# 已经上锁,如何自动解锁:std::adopt_lock参数

如果当前mutex已经上锁,但是之后仍然希望利用RAII思想进行管理,在析构的时候自动解锁,可以使用std::adopt_lock参数进行构造std::unique_lock或std::lock_guard。

mtx.lock();
std::uniqie_lock grd(mtx, std::adopt_lock);
// do something

# 构造中尝试上锁:std::try_to_lock参数

在构造函数中调用try_lock()方法(下面会提到),可以用.owns_lock() 方法判断是否上锁成功。

std::unique_lock grd(mtx, std::try_to_lock);
if (grd.owns_lock())
    std::cout << "success!";
else
    std::cout << "failed!";

# 如果上锁失败,不要等待:try_lock()方法

if (mtx.try_lock())
    std::cout << "success!\n";
else
    std::cout << "failed!\n";

if (mtx.try_lock())
    std::cout << "success!\n";
else
    std::cout << "failed!\n";

mtx.unlock();

# 如果上锁失败,等一会:try_lock_for()方法

如需等待一段时间,可用std::timed_mutex的try_lock_for()方法。

if (mtx.try_lock(std::chrono::milliseconds(500)))
    std::cout << "success!\n";
else
    std::cout << "failed!\n";

if (mtx.try_lock(std::chrono::milliseconds(500)))
    std::cout << "success!\n";
else
    std::cout << "failed!\n";

mtx.unlock();

# 死锁问题

# 不同线程发生的死锁问题

对于以下问题,由于指令并不是同步执行的,因此有可能出现这个情况:

std::thread t1 ([&] {
    for (int i = 0; i < 1000; i++){
        mtx1.lock();
        mtx2.lock();
        mtx2.unlock();
        mtx1.unlock();
    }
});
std::thread t2 ([&] {
    for (int i = 0; i < 1000; i++){
        mtx2.lock();
        mtx1.lock();
        mtx1.unlock();
        mtx2.unlock();
    }
});
t1.join();
t2.join();
  • t1执行给mtx1上锁 -> 成功
  • t2执行给mtx2上锁 -> 成功
  • t1执行给mtx2上锁 -> 失败,等待
  • t2执行给mtx1上锁 -> 失败,等待

由于互相锁着对方,因而等待无限制,造成了死锁问题。

# 解决1:永远不要同时持有两把锁

修改内部上锁解锁逻辑:

mtx1.lock();
mtx1.unlock(); // 在持有另一把锁之前现解锁
mtx2.lock();
mtx2.unlock();

# 解决2:保证双方的上锁顺序一致

只需要保证双方上锁的顺序一致,即可避免死锁。

# 解决3:用std::lock同时对多个上锁

如果没办法保证上锁顺序一致,可以用标准库的std::lock(mtx1, mtx2, ...) 函数,一次性对多个mutex上锁。这个函数保证在无论任意线程中调用的顺序是否相同,都不会产生死锁问题。

std::thread t1 ([&] {
    for (int i = 0; i < 1000; i++){
        std::lock(mtx1, mtx2);
        mtx2.unlock();
        mtx1.unlock();
    }
});
std::thread t2 ([&] {
    for (int i = 0; i < 1000; i++){
        std::lock(mtx2, mtx1);
        mtx1.unlock();
        mtx2.unlock();
    }
});
t1.join();
t2.join();

# 解决3-PLUS:使用RAII版本的std::lock:std::scoped_lock

std::thread t1 ([&] {
    for (int i = 0; i < 1000; i++){
        std::scoped_lock grd(mtx1, mtx2);
        // do something
    }
});
std::thread t2 ([&] {
    for (int i = 0; i < 1000; i++){
        std::scoped_lock grd(mtx2, mtx1);
        // do something
    }
});
t1.join();
t2.join();

# 同一线程发生的死锁问题

void other() {
    mtx1.lock();
    mtx1.unlock();
}
void func() {
    mtx1.lock();
    other();
    mtx1.unlock();
}
int main() {
    func();
}

func函数中上锁后调用other继续上锁则会陷入死锁中。

# 解决1:other里不要上锁

如题

# 解决2:改用std::recursive_mutex

使用std::recursive_mutex而非std::mutex。

前者会自动判断是不是同一个线程上锁多次,是则让计数器加一,解锁计数器会减一,直到0才会真正解锁,当然会产生一定的性能损失。

同理还有std::recursive_timed_mutex,如果同时需要try_lock_for的话。

# 多线程中的数据结构

利用锁实现线程安全的数据结构,举例实现线程安全的vector。

class MTVector
{
public:
    std::vector<int> m_vec;
    mutable std::mutex m_mtx;

    void push_back(int v)
    {
        m_mtx.lock();
        m_vec.push_back(v);
        m_mtx.unlock();
    }

    size_t size() const
    {
        m_mtx.lock();
        size_t ret = m_vec.size();
        m_mtx.unlock();
        return ret;
    }
};

在const函数中其实无法对std::mutex进行上锁与解锁操作,为了支持这类逻辑上const的函数,可以使用mutable关键字对mtx进行修饰,从而所有成员里只有他不是const的。

# 读写锁

这是一种提高性能的方式,对于同一个数据,可以多个人一起读,但是对于写,只允许一个人来操作。即,读可以共享,写必须独占,且写和读不能共存。

# 读写锁:std::shared_mutex

标准库提供的std::shared_mutex,可以对上述代码修改如下:

size_t size() const
{
    m_mtx.lock_shared();
    size_t ret = m_vec.size();
    m_mtx.unlock_shared();
    return ret;
}

因为push_back()是对数据的写操作,仍使用lock/unlock方法组合, 而对于size()是对数据的读操作,不修改数据,因此可以使用lock_shared/unlock_shared组合。

# std::shared_lock:符合RAII思想的lock_shared()

正如std::unique_lock之于lock(),std::shared_lock之于lock_shared(),这保证自动调用unlock_shared(),更加安全。

shared_lock 同样支持 defer_lock ,owns_lock() 等。

# 访问者模式

访问者模式主要将数据结构与数据操作分离,利用访问者模式,可以只需要上一次锁即可,而且复合RAII思想。

class MTVector
{
public:
    std::vector<int> m_vec;
    std::mutex m_mtx;

    class Accessor
    {
    public:
        MTVector& m_that;
        std::unique_lock<std::mutex> m_guard;

        Accessor(MTVector& that) :m_that(that), m_guard(that.m_mtx) {}

        void push_back(int v) const {
            return m_that.m_vec.push_back(v);
        }

        size_t size() const {
            return m_that.m_vec.size();
        }
    };

    Accessor access() {return { *this };}
};

# 条件变量

# 等待被唤醒

只有当某个事件发生后,线程才可以继续执行。std::condition_variable 的 notify_one() 用于唤醒一个线程; notify_all() 则是通知所有线程。下面是一个条件变量的举例 :

 std::condition_variable cv;
std::mutex mtx;

std::jthread t1([&] {
    std::unique_lock lck(mtx);
    cv.wait(lck);
    std::cout << "t1 is awake.\n";
});

std::cout << "Tasks begin.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "notifying...\n";
cv.notify_one();

# 等待某一条件成真

wait 可以额外指定一个参数:wait(lck, expr)。expr是个lambda表达式,只有返回值为true时才会真正被唤醒。

std::condition_variable cv;
std::mutex mtx;
bool ready = false;

std::jthread t1([&] {
    std::unique_lock lck(mtx);
    cv.wait(lck, [&] {return ready; });
    std::cout << "t1 is awake.\n";
});

std::cout << "Tasks begin.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cout << "notifying not ready\n";
cv.notify_one();

ready = true;
std::cout << "notifying ready\n";
cv.notify_one();

# 条件变量应用

# 实现生产-消费者模式

std::condition_variable food_cv;
std::mutex food_mtx;
std::vector<int> foods;
std::thread c1([&] {
    for (int i = 0; i < 2; i++) {
        std::unique_lock lck1(food_mtx);
        food_cv.wait(lck1, [&] {return foods.size() != 0; });
        std::cout << "C1 got food: " << foods.back() << '\n';
        foods.pop_back();
    }
});
std::thread c2([&] {
    for (int i = 0; i < 2; i++) {
        std::unique_lock lck1(food_mtx);
        food_cv.wait(lck1, [&] {return foods.size() != 0; });
        std::cout << "C2 got food: " << foods.back() << '\n';
        foods.pop_back();
    }
});

foods.push_back(12);
food_cv.notify_one();
foods.push_back(13);
food_cv.notify_one();
foods.push_back(14);
foods.push_back(15);
food_cv.notify_all();

c1.join();
c2.join();

# 将foods队列封装为一个类

template<typename T>
class MTQueue
{
    std::condition_variable m_cv;
    std::mutex m_mtx;
    std::queue<T> m_queue;

public:
    T pop() {
        std::unique_lock lck(m_mtx);
        m_cv.wait(lck, [this] {return !this->m_queue.empty(); });
        T ret = std::move(m_queue.front());
        m_queue.pop();
        return ret;
    }

    void push(T val) {
        std::unique_lock lck(m_mtx);
        m_queue.push(std::move(val));
        m_cv.notify_one();
    }

    void push_many(std::initializer_list<T> vals) {
        std::unique_lock lck(m_mtx);
        for (auto && val : vals)
        {
            m_queue.push(std::move(val));
        }
        m_cv.notify_all();
    }
};

std::condition_variable仅仅支持std::unique_lock<std::mutex>作为wait的参数,如果需要用其他类型的mutex锁,可以用std::condition_variable_any。

# 原子操作

一个经典案例:

int counter = 0;
std::thread t1([&] {
    for (int i = 0; i < 10000; ++i) {
        counter += 1;
    }
});
std::thread t2([&] {
    for (int i = 0; i < 10000; ++i) {
        counter += 1;
    }
});
t1.join();
t2.join();
std::cout << counter << '\n';

经典暴力解决方法是:用mutex上锁。

这样做的问题是,太过重量级,通过操作系统让线程被挂起,进入内核层,有很大的开销,严重影响了执行效率。因此,使用更轻量级的std::atomic,对他的+=等操作,编译器会转换为专门的指令,CPU识别到该指令,会保证该操作时原子的。

因此对于上述代码,只需要把int counter = 0;改为std::atomic<int> counter = 0;即可。

注意:counter = counter + 1; 不能保证原子性,需要使用+=,++ 类似的操作符。

# fetch_add: 与 += 等价

除了使用运算重载付之外,还可以直接调用相应的函数名,例如:

  • fetch_add: +=;
  • store: =;
  • load: 读取其中的值

另外,fetch_add还可以返回其旧值:int old = atm.fetch_add(value);,这个特点可以用于并行地往一个列表里追加数据:追加写入的索引值就是返回的旧值。

# exchange:读取的同时写入

exchange(value)会把value写入原子变量的同时,返回其旧的值。

std::atomic<int> counter;
counter.store(0);
int old = counter.exchange(3);
std::cout << "old = " << old << '\n'; // 0
int now = counter.load();
std::cout << "now = " << now << '\n'; // 3

# compare_exchange_strong读取,比较是否相等,相等则写入

compare_exchange_strong(old, value)会读取原子变量的值,比较他是否和old相等:

  • 相等,则把value写入原子变量
  • 不等,则把原子变量的值写入old;
  • 返回bool值表示是否相等
编辑 (opens new window)
#Cpp
上次更新: 2023/08/20, 00:05:30
现代C++:智能指针
默认构造返回多个类型

← 现代C++:智能指针 默认构造返回多个类型→

最近更新
01
ipopt优化库配置及使用
07-21
02
ubuntu离线安装包的方法
07-21
03
其它控件的使用
03-05
更多文章>
Theme by Vdoing | Copyright © 2020-2024 Sirius0v0 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式