C++并发编程

《总之,好记性不如烂笔头!把你遗忘的都记下来吧!》

创建线程

std::thread

函数可以是普通函数成员函数函数对象lambda表达式

普通函数

1
std::thread(func, args...)
1
2
3
4
5
6
7
8
9
void printHelloWorld(const std::string &msg){
std::cout<<msg;

}
int main(){
std::thread thread1(printHelloWorld, "hello world");
thread1.join();
return 0;
}

类的非静态成员函数

1
2
A a;
std::thread t(&A::func, &a);

支持拷贝操作

1、 join() 阻塞主线程,等待子线程完成

**joinable()**判断对象能够汇合,如果不能汇合将返回false,如果对已经汇合过的线程调用join()将产生错误行为。

2、detach() 将子线程和主线程分离

如果等到std::thread对象销毁还没决定join()/detach() ,std::thread的析构函数将调用**std::terminate()**终止整个程序。

获取线程ID

线程ID所属的型别为std::thread::id, 可以通过调用std::this_thread::get_id()获得。

std::thread源码剖析

数据未定义错误/线程传递参数问题

1、传入函数对象临时变量导致构造函数语法与声明函数一致

如:std::thread my_thread(background_task()),此时将my_thread当作函数指针

解决方法:

1
2
3
4
// 1、多加一个括号
std::thread t((background_task()));
// 2、使用列表初始化
std::thread t{background_task()};
2、传入临时变量,但形参为引用

临时变量被传入参数后销毁,形参的引用导致了未定义的行为。

解决方法:

  • 通过智能指针传递参数,因为引用计数会随着赋值增加,保证变量使用期间不被释放,叫做:伪闭包策略
  • 将局部变量的值作为参数传递,但拷贝消耗时间空间
3、传递指针或者引用指向已经释放的内存
4、入口函数为类的私有成员函数

解决方法:将私有成员函数变为友元函数

5、移交线程归属权

不能将一个线程的归属权交给一个已经绑定线程的变量

在异常的情况下等待线程完结

当主线程发生异常时,可能会导致子线程的join()未执行,从而导致子线程不能执行完全。

1
2
3
4
5
6
7
8
9
10
11
// 使用异常捕获确保子线程被join()
void f(){
std::thread t(my_func);
try {
do_something_in_current_thread();
}catch(...){
t.join();
throw;
}
t.join();
}

通过RAII过程等待线程完结

1
2
3
4
5
6
7
8
9
10
11
12
13
class thread_guard {
public:
explicit thread_guard(std::thread &_t):t(_t){}
thread_guard(const thread_guard &) = delete;
thread_guard &operator=(const thread_guard &) = delete;
~thread_guard(){
if(t.joinable()){
t.join();
}
}
private:
std::thread &t;
};

控制运行的线程数量

函数**std::thread::hardware_concurrency()**表示程序在各次运行中可真正并发的线程数量。在多核系统上,该值可能就是CPU的核芯数量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
// 并发版本的accumulate
#include <iostream>
#include <thread>
#include <algorithm>
#include <numeric>
#include <vector>
#include <chrono>
#include <exception>
struct empty_stack : public std::exception{
const char *what() const throw();
};
template<typename Iterator, typename T>
struct accumlate_block{
void operator()(Iterator first, Iterator last, T& result){
result = std::accumulate(first, last, result);
}
};

template<typename Iterator, typename T>
T parallel_accumulate(Iterator first, Iterator last, T init){
auto length = std::distance(first, last);
if(!length)
return init;
const long min_per_thread = 25;
const long max_threads = (length + min_per_thread - 1) / min_per_thread;
const long hardware_threads = std::thread::hardware_concurrency();
const long num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2,
max_threads);
const long block_size = length / num_threads;
std::vector<T> results(num_threads);
std::vector<std::thread> threads(num_threads - 1);
Iterator block_start = first;
for (unsigned long i = 0; i < (num_threads - 1);++i){
Iterator block_end = block_start;
std::advance(block_end, block_size);
threads[i] = std::thread(
accumlate_block<Iterator, T>(), block_start, block_end, std::ref(results[i]));
block_start = block_end;
}
accumlate_block<Iterator, T>()(block_start, last, results[num_threads - 1]);
for(auto& entry: threads){
entry.join();
}
return std::accumulate(results.begin(), results.end(), init);
}

int main(int argc, char const *argv[])
{
std::vector<int> a;
for (int i = 0; i < 10000000;++i){
a.push_back(1);
}
// 并发的accumlate
auto beforeTime = std::chrono::steady_clock::now();
parallel_accumulate(a.begin(), a.end(), 0);
auto afterTime = std::chrono::steady_clock::now();
std::cout << std::chrono::duration<double>(afterTime - beforeTime).count() << std::endl;
// 非并发的accmulate
beforeTime = std::chrono::steady_clock::now();
std::accumulate(a.begin(), a.end(), 0);
afterTime = std::chrono::steady_clock::now();
std::cout << std::chrono::duration<double>(afterTime - beforeTime).count() << std::endl;
// 标准库中并发的std::reduce
beforeTime = std::chrono::steady_clock::now();
std::reduce(a.begin(), a.end(), 0);
afterTime = std::chrono::steady_clock::now();
std::cout << std::chrono::duration<double>(afterTime - beforeTime).count() << std::endl;
return 0;
}

存储std::thread容器

使用容器存储线程,如std::vector,由于std::thread没有拷贝构造函数使用push_back必须显示构造一个临时std::thread变量,然后利用移动构造函数移入容器。

而使用emplace_back可以直接在容器内部构造一个std::thread,能够减少一次移动构造。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class my_thread{
public:
void operator()()const{
}
};
int main(){
std::vector<std::thread> vec;
std::thread t1((my_thread()));
t1.join();
// 使用push_back()
vec.push_back(std::thread(my_thread()));
// 使用emplace_back()
vec.emplace_back((my_thread()));
for(auto &t:vec){
t.join();
}
return 0;
}

std::jthread

自动汇合(join on destruction):

  • std::jthread 在析构时自动调用 join(),确保线程在对象销毁前完成。这可以避免常见的资源泄漏问题或未处理的线程终止问题。
  • 对比之下,std::thread 在析构时如果线程仍然可运行(joinable),则会调用 std::terminate()

传递停止令牌(stop token):

  • std::jthread 提供了一种机制,可以传递一个 std::stop_token 给线程函数,用于请求线程停止。这是通过 std::jthread 构造函数的重载实现的,它会创建一个 std::stop_source 和一个 std::stop_token,并将 std::stop_token 传递给线程函数。
  • 这使得线程管理和停止控制更加方便和安全。

更好的 RAII 支持:

  • 由于 std::jthread 自动管理线程的生命周期(自动 join),它更符合 RAII(资源获取即初始化)原则,降低了出错的概率。

数据共享

std::mutex

利用了临界区的概念,创建了一个互斥的代码块。创建互斥锁

1、lock()加锁

2、unlock()解锁

std::lock

同时对两个锁进行加锁

1
std::lock(mtx1, mtx2);

std::timed_mutex

支持延迟加锁的互斥锁,当使用延迟加锁时,就使用这个锁来控制互斥量。

std::lock_guard

构造时加锁系,析构时解锁,只在局部作用域中使用

1
2
3
4
5
6
7
8
9
10
// 使用lock_guard需要注意使用{ }花括号来控制作用域!
while(true){
{
std::lock_guard<std::mutex> lock(mtx);
--shared_data;
std::cout << "current thread is " << std::this_thread::get_id() << std::endl;
std::cout << "shared_data is " << shared_data << std ::endl;
}
std::this_thread::sleep_for(std::chrono::microseconds(1000));
} });

传递第二个参数为std::adopt_lock表示获取锁之前已经加锁了, 锁是领取过来的。通过std::lock_gruard就可以管理这个领取过来的锁了。

1
2
3
4
5
// 为了同时给mtx1,mtx2加锁
std::lock(mtx1, mtx2);
// 将这两个锁领养过来管理
std::lock_guard<std::mutex> guard1(mtx1, std::adopt_lock);
std::lock_guard<std::mutex> guard2(mtx2, std::adopt_lock)

std::scope_lock c++17

提供了另一种同时对多个锁进行加锁并控制的操作

1
std::scope_lock lock(mtx1, mtx2);

std::unique_lock

对互斥量的灵活加锁,构造时自动加锁,析构时解锁。

传递第二个参数std::adopt_lock指明对象管理互斥上的锁;

​ std::defer_lock使互斥在完成构造时为无锁状态,延迟加锁。

成员函数:

lock()自己加锁;

unlock()自己解锁;

owns_lock() 判断是否占有了锁;

try_lock_for()等待一段时间,如果获取不到锁就结束等待;

try_lock_until()等待到某个时间, 如果获取不到锁就结束等待;

std::shared_mutex c++17

  • 提供了 lock(), try_lock(), 和 try_lock_for() 以及 try_lock_until() 函数,这些函数都可以用于获取互斥锁。

  • 提供了 try_lock_shared()lock_shared() 函数,这些函数可以用于获取共享锁。

  • std::shared_mutex 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁。

使用std::shared_lock<>来构造控制共享锁, 独占的锁就由std::lock_guardstd::unique_lock控制

读操作需要使用共享锁的原因:

1、在读操作的过程中需要确保没有写操作,防止读入不完整的数据;

2、读操作之间是可以同时进行的;

std::shared_time_mutex c++14

  • std::shared_mutex 类似,也提供了 lock(), try_lock(), 和 try_lock_for() 以及 try_lock_until() 函数用于获取互斥锁。
  • std::shared_mutex 不同的是,它还提供了 try_lock_shared()lock_shared() 函数用于获取共享锁,这些函数在尝试获取共享锁时具有超时机制。
  • std::shared_timed_mutex 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁,这与 std::shared_mutex 相同。然而,当尝试获取共享锁时,如果不能立即获得锁,std::shared_timed_mutex 会设置一个超时,超时过后如果仍然没有获取到锁,则操作将返回失败。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// 使用共享锁共享读取的案例
#include <iostream>
#include <thread>
#include <mutex>
#include <map>
#include <shared_mutex>
class DNSService {
public:
DNSService(){}
// 查询操作使用共享锁
std::string QueryDns(std::string dnsName){
std::shared_lock<std::shared_mutex> lock(m_shared_mtx);
auto iter = dns_info.find(dnsName);
if(iter != dns_info.end()){
return iter->second;
}
return "";
}
// 写操作独占锁
void AddDnsInfo(std::string dsnName, std::string dsnEntry){
std::lock_guard<std::shared_mutex> lock(m_shared_mtx);
dns_info.insert(std::make_pair(dsnName, dsnEntry));
}

private:
std::map<std::string, std::string> dns_info;
mutable std::shared_mutex m_shared_mtx;
};

int main(int argc, char const *argv[])
{
DNSService dns;
std::thread t1([&dns]() { dns.AddDnsInfo("www.haha.com", "127.0.0.1"); });
std::thread t2(
[&dns]() { std::cout << dns.QueryDns("www.haha.com") << std::endl; });

t1.join();
t2.join();
return 0;
}

递归锁

在接口内部加锁,并调用另一个加锁的接口导致循环加锁而卡死,可以使用递归锁来来避免。

std::recursive_mutex,但是并不推荐使用这种操作,应该从设计上避免递归加锁。

线程安全的栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
template<typename T>
class safe_stack{
public:
// 默认构造函数
safe_stack(){}
// 拷贝构造函数
safe_stack(const safe_stack & other){
std::lock_guard<std::mutex> lock(other.mtx);
stk = other.stk;
}
// 禁用拷贝赋值运算符
safe_stack &operator=(const safe_stack &) = delete;
void push(T new_value){
std::lock_guard<std::mutex> lock(this->mtx);
stk.push(std::move(new_value));
}
std::shared_ptr<T> pop(){
std::lock_guard<std::mutex> lock(this->mtx);
if(stk.empty())
std::cerr << "pop error";
std::shared_ptr<T> ptr = std::make_shared<T>(stk.top());
stk.pop();
return ptr;
}

void pop(T& value){
std::lock_guard<std::mutex> lock(this->mtx);
if(stk.empty())
std::cerr << "pop error";
value = stk.top();
stk.pop();
}

private:
std::mutex mtx;
std::stack<T> stk;

};

线程安全的单例模式

局部静态变量实现单例

1
2
3
4
5
6
7
8
9
10
11
12
13
// 局部静态变量实现单例类 c11之前是线程不安全的
class SingleInstance{
private:
SingleInstance(){}
SingleInstance(const SingleInstance &) = delete;
SingleInstance& operator=(SingleInstance &) = delete;

public:
static SingleInstance& GetInstance(){
static SingleInstance single;
return single;
}
};

饿汉模式的单例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 饿汉模式实现单例
class SingleHungryInstance {
private:
SingleHungryInstance(){}
SingleHungryInstance(const SingleHungryInstance &) = delete;
SingleHungryInstance &operator=(SingleHungryInstance &) = delete;
public:
static SingleHungryInstance* GetInstance(){
if(single == nullptr){
single = new SingleHungryInstance;
}
return single;
}
private:
static SingleHungryInstance *single;
};
// 初始化类静态成员变量
SingleHungryInstance* SingleHungryInstance::single = new SingleHungryInstance;

懒汉模式的单例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class SingleLazyInstance {
private:
SingleLazyInstance(){}
SingleLazyInstance(const SingleLazyInstance &) = delete;
SingleLazyInstance &operator=(SingleLazyInstance &) = delete;
public:
static SingleLazyInstance* GetInstance(){
if(single != nullptr){
return single;
}
m_mtx.lock();
// 双检查锁
if(single != nullptr){
m_mtx.unlock();
return single;
}
single = new SingleLazyInstance();
m_mtx.unlock();
return single;
}

private:
static SingleLazyInstance *single;
static std::mutex m_mtx;
};
// 静态成员变量不要忘记类外初始化
SingleLazyInstance *SingleLazyInstance::single = nullptr;
std::mutex SingleLazyInstance::m_mtx;

std::call_once的单例

在单例模式下使用,std::once_flag实例对应一次不同的初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 使用std::call_once的单例模式
class SingleLazyInstance2 {
private:
SingleLazyInstance2(){}
SingleLazyInstance2(const SingleLazyInstance2 &) = delete;
SingleLazyInstance2 &operator=(SingleLazyInstance2 &) = delete;
public:
static SingleLazyInstance2* GetInstance(){
// 使用std::call_once确保single只被初始化一次
std::call_once(initFlag, []() {
single = new SingleLazyInstance2;
});
return single;
}
private:
static SingleLazyInstance2 *single;
static std::once_flag initFlag;
};
// 静态成员变量不要忘记类外初始化
SingleLazyInstance2 *SingleLazyInstance2::single = nullptr;
std::once_flag SingleLazyInstance2::initFlag;

使用std::call_once实现的单例模板类

// TODO

死锁

防范死锁的准则

  • 避免嵌套锁

如果已经持有了锁,就不要试图获取第二个锁。如果要获取多个锁,应该采用std::lock()/std::scope_lock,通过一次动作全部获取所有锁。

  • 使用固定的顺序依次获取锁

  • 层级锁

    在实际开发中很难规避在同一个函数内加多个锁的情况,我们要尽可能的避免循环加锁,可以自定义一个层级锁,保证对多个互斥量加锁时是有序的。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    #include <iostream>
    #include <thread>
    #include <mutex>

    class hierarchical_mutex {
    public:
    explicit hierarchical_mutex(unsigned long value)
    : hierarchy_value(value), previous_hierarchy_value(0){
    }
    hierarchical_mutex(const hierarchical_mutex &) = delete;
    hierarchical_mutex &operator=(const hierarchical_mutex &) = delete;
    void lock(){
    check_for_hierarchy_violation();
    internal_mutex.lock();
    update_hierarchy_value();
    }
    void unlock(){
    // 解锁的时候必须确保上一次加锁的也是该对象
    if(this_thread_hierarchy_value != hierarchy_value){
    throw std::logic_error("mutex hierarchy violated1");
    }
    // 恢复原来的层级
    this_thread_hierarchy_value = previous_hierarchy_value;
    internal_mutex.unlock();
    }
    bool try_lock(){
    check_for_hierarchy_violation();
    if(internal_mutex.try_lock()){
    return false;
    }
    update_hierarchy_value();
    return true;
    }
    private:
    void check_for_hierarchy_violation(){
    // 加锁的等级只能从高到底,否则抛出逻辑错误
    if(this_thread_hierarchy_value <= hierarchy_value){
    throw std::logic_error("mutex hierarchy violated2");
    }
    }
    void update_hierarchy_value(){
    previous_hierarchy_value = this_thread_hierarchy_value;
    this_thread_hierarchy_value = hierarchy_value;
    }
    private:
    std::mutex internal_mutex; // 层级锁
    const unsigned long hierarchy_value; // 当前层级值
    unsigned long previous_hierarchy_value; // 上一次层级值
    static thread_local unsigned long this_thread_hierarchy_value; // 在单个线程内只存在一个的线程静态变量,表示
    // 当前线程已经加了某个等级的锁
    };
    thread_local unsigned long
    hierarchical_mutex::this_thread_hierarchy_value(ULONG_MAX);


    void test_hierarchy_lock() {
    hierarchical_mutex hmtx1(1000);
    hierarchical_mutex hmtx2(500);
    std::thread t1([&hmtx1, &hmtx2]() {
    hmtx1.lock();
    hmtx2.lock();
    hmtx2.unlock();
    hmtx1.unlock();
    });

    std::thread t2([&hmtx1, &hmtx2]() {
    hmtx1.lock();
    hmtx2.lock();
    hmtx2.unlock();
    hmtx1.unlock();
    });

    t1.join();
    t2.join();
    }
    int main(int argc, char const *argv[])
    {
    test_hierarchy_lock();

    return 0;
    }

线程间传递变量

1、通过共享变量

2、通过参数传递

3、std::promise

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#include <iostream>
#include <future>

void func(std::promise<int> &f){
f.set_value(1000);
}

int main(int argc, char const *argv[])
{
std::promise<int> f;
std::future<int> future_res = f.get_future();
std::thread t1(func, std::ref(f));
t1.join();
std::cout << future_res.get() << std::endl;
return 0;
}

并发的同步

std::condition_variable条件变量

成员函数:

void wait( std::unique_lock< std::mutex> >& lock, Predicate pred );

wait导致当前线程被阻塞直到条件变量被通知,或者虚假唤醒发生,等待期间将解锁互斥。

虚假唤醒:线程重新获得互斥,并查验条件,而这一行为却不是通过另一个线程的通知。(可能是操作系统做出的唤醒)

1
2
3
4
// 当传递谓词时,其等价于
While(!pred())
wait(lock);
// 本质上是忙等的优化,while可以防止虚假唤醒导致线程阻塞失效

当谓词返回false时,线程进入阻塞或等待状态,解锁互斥;

当谓词返回true时,线程从wait()中返回,互斥仍被锁住。

wait_for()

阻塞当前线程,直到条件变量被唤醒,或者到达指定时长

wait_until()

阻塞当前线程,直到条件变量被唤醒,或者到指定时间点

notify_one()

通知一个等待的线程

notify_all()

通知所有等待的线程

使用条件变量循环打印消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <chrono>
std::mutex mtx;
std::condition_variable cv_a;
std::condition_variable cv_b;
int num = 1;
void do_printA(int &num){
while(true){
std::unique_lock<std::mutex> lock(mtx);
cv_a.wait(lock, [&num](){
return num == 1;
});

++num;
std::cout << "current thread num : " << num << std::endl;
lock.unlock();
// 通知阻塞线程
cv_b.notify_one();
// std::this_thread::sleep_for(std::chrono::microseconds(500));
}
}
void do_printB(int &num){
while(true){
std::unique_lock<std::mutex> lock(mtx);
cv_b.wait(lock, [&num](){
return num == 2;
});

--num;
lock.unlock();
std::cout << "current thread num : " << num << std::endl;
cv_a.notify_one();
// std::this_thread::sleep_for(std::chrono::microseconds(500));
}
}
int main(int argc, char const *argv[])
{
std::thread t1(do_printA, std::ref(num));
std::thread t2(do_printB, std::ref(num));
t1.join();
t2.join();
return 0;
}

使用条件变量实现生产者消费者模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <queue>
std::mutex g_mutex;
std::condition_variable g_cv;
std::queue<int> g_queue;

void Producer() {
for (int i = 0; i < 10; i++) {
{
std::unique_lock<std::mutex> lock(g_mutex);
g_queue.push(i);
std::cout << "Producer: produced " << i << std::endl;
}
g_cv.notify_one();
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
void Consumer() {
while (true) {
std::unique_lock<std::mutex> lock(g_mutex);
g_cv.wait(lock, []() { return !g_queue.empty(); });
int value = g_queue.front();
g_queue.pop();
std::cout << "Consumer: consumed " << value << std::endl;
}
}
int main() {
std::thread producer_thread(Producer);
std::thread consumer_thread(Consumer);
producer_thread.join();
consumer_thread.join();
return 0;
}

条件变量实现线程安全队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include <iostream>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>

template<typename T>
class ThreadSafeQueue{
public:
ThreadSafeQueue(){}
ThreadSafeQueue(const ThreadSafeQueue& t){
std::lock_guard<std::mutex> lock(m_mtx);
m_queue = t.m_queue;
}
ThreadSafeQueue &operator=(ThreadSafeQueue &) = delete;
void push(T new_value){
std::lock_guard<std::mutex> lock(m_mtx);
m_queue.push(new_value);
m_cv.notify_one();
}
// 等待出栈,使用引用返回pop值
void wait_and_pop(T &value){
std::unique_lock<std::mutex> lock(m_mtx);
// pop为空时就阻塞挂起
m_cv.wait(lock, [this]()
{ return !m_queue.empty(); });
value = m_queue.front();
m_queue.pop();
}
// 等待出栈,使用智能指针返回pop值
std::shared_ptr<T> wait_and_pop(){
std::unique_lock<std::mutex> lock(m_mtx);
m_cv.wait(lock, [this]()
{ return !m_queue.empty(); });
auto ptr = std::make_shared<T>(m_queue.front());
m_queue.pop();
return ptr;
}
// 尝试出栈,成功出栈返回true
bool try_pop(T &value){
std::lock_guard<std::mutex> lock(m_mtx);
if(m_queue.empty())
return false;
value = m_queue.front();
m_queue.pop();
return true;
}
std::shared_ptr<T> try_pop(){
std::lock_guard<std::mutex> lock(m_mtx);
if(m_queue.empty())
return std::make_shared<T>(NULL);
auto ptr = std::make_shared<T>(m_queue.front());
m_queue.pop();
return ptr;
}

bool empty() const{
std::lock_guard<std::mutex> lock(m_mtx);
return m_queue.empty();
}

private:
mutable std::mutex m_mtx; // 声明为mutable,即使为常成员函数也能修改其值
std::queue<T> m_queue;
std::condition_variable m_cv;
};

int main(int argc, char const *argv[])
{
// 测试用例,创建一个生产者线程不断向队列中加入元素1....n,
// 有两个消费者线程从队列中取数据,其中消费者A使用等待的方式来取数据,消费者B使用尝试的方式来取数据
ThreadSafeQueue<int> queue;
std::thread t1([&queue]() {
for (int i = 0; ;++i) {
queue.push(i);
std::cout << "producer push:" << i << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
});
std::thread consumer1([&queue]() {
while(true){
auto ptr = queue.wait_and_pop();
std::cout << "consumer1 get:" << *ptr.get() << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
std::thread consumer2([&queue]() {
while(true){
int ret;
queue.try_pop(ret);
std::cout << "consumer2 get:" << ret << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
t1.join();
consumer1.join();
consumer2.join();
return 0;
}

异步并发

std::async

std::async使用异步的方式启动任务,从其返回值中获得std::future对象。默认情况下编译器会根据当前线程情况自动选择是否开启专属的线程执行任务。

不需要管理线程的生命周期。

可以指定第一个参数:std::lauch

std::lauch::deferred 在当前线程上延后调用任务函数,直到遇到在futrue上get/wait才会执行任务函数;

std::lauch::async 开启专属的线程,在新线程上运行任务函数;

默认情况下,使用std::lauch::deferred | std::lauch::async策略,由编译器选择执行情况

std::future

调用get()时,当前线程会阻塞,以便future准备妥当并返回值或者异常。get()只能被调用一次,因为它会移动或者消耗掉std::future对象的状态;

wait()也是一个阻塞调用,wait()不会返回任务结果,它只是等待异步任务完成。如果任务完成,wait()会立即返回,如果任务没有完成wait()会阻塞当前线程,直到任务完成。wait()可以被调用多次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#include <iostream>
#include <future>
#include <thread>
int find_the_answer_to_ltuae(){
int a = 0;
for (int i = 0; i < 100;++i){
++a;
}
return a;
}
int main(){
std::future<int> future_result = std::async(std::launch::async, find_the_answer_to_ltuae);
std::cout << find_the_answer_to_ltuae() << std::endl;
std::cout << future_result.get() << std::endl;

std::packaged_task

连接future对象可调用函数

std::packaged_task是一个类模板,其模板参数是函数签名。例如:<void()> 代表了一个不接受参数和没有返回值的函数。

1、std::package_task<T> task (func)其中func是被包装的任务(函数),模板类型为函数的返回值;

2、通过std::future<T> f = task.get_future()获取与任务关联的future对象。

3、再通过f.get()来获取任务的返回值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#include <iostream>
#include <future>
#include <thread>
int find_the_answer_to_ltuae(){
int a = 0;
for (int i = 0; i < 100;++i){
++a;
}
return a;
}
int main(){
std::packaged_task<int()> task(find_the_answer_to_ltuae);
auto future_packged_result = task.get_future();
std::thread t1(std::move(task));
t1.join();
std::cout << future_packged_result.get() << std::endl;
return 0;
}

std::promise

std::promise用于在某一线程中设置某个值或异常,而std::future则用于在另一线程中获取这个值或异常。

set_value()设置值,在主线程通过std::future::get()获取设置的值,如果值未设置,将阻塞。

set_exception()设置异常, 设置的异常通过std::current_exception()获取。子线程设置的异常,主线程必须捕获这个设置的异常。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
#include <iostream>
#include <future>
#include <string>

void set_execption(std::promise<void> pro){
try{
throw std::runtime_error("An error occured");
}catch(...){
pro.set_exception(std::current_exception());
}
}

int main(int argc, char const *argv[])
{
// 设置值
std::promise<std::string> p;
std::future<std::string> f = p.get_future();
std::thread t1([](std::promise<std::string> p)
{ p.set_value("hello world"); }, std::move(p));
std::cout << f.get() << std::endl;
t1.join();
// 设置异常
std::promise<void> promiseException;
std::future<void> future = promiseException.get_future();
std::thread t2(set_execption, std::move(promiseException));
// 必须在主线程中捕获异常
try {
std::cout << "waiting for set exception" << std::endl;
// future.get()会重新获取异常
future.get();
}catch(const std::exception& e){
std::cout << "catch :" << e.what() << std::endl;
}
t2.join();
return 0;
}

std::shared_future

当多个线程需要等待同一个执行结果时,可以使用std::shared_future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
#include <iostream>
#include <future>
#include <thread>
int main(int argc, char const *argv[])
{
std::promise<int> promise;
std::shared_future<int> shared_f = promise.get_future();
// 在t1线程内设置一个值为10
std::thread t1([](std::promise<int> &&p){
std::this_thread::sleep_for(std::chrono::seconds(2));
p.set_value(10);
}, std::move(promise));
// 在t2线程内获取这个值,并加1
std::thread t2([shared_f](){
int m = shared_f.get();
std::cout << std::this_thread::get_id() << " num is" << m + 1 << std::endl;
});
// 在t3线程内获取这个值,并减去5
std::thread t3([shared_f](){
int m = shared_f.get();
std::cout << std::this_thread::get_id() << " num is" << m - 5 << std::endl;
});
t1.join();
t2.join();
t3.join();
return 0;
}

利用future进行函数式编程

函数式编程风格:把运算过程尽量写成一系列嵌套的函数调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
#include <iostream>
#include <algorithm>
#include <list>
#include <future>
#include <chrono>
#include <fstream>
#include <string>
#include <sstream>
using namespace std::chrono;

// 函数式编程的快速排序
template<typename T>
std::list<T> quick_sort(std::list<T> input){
if(input.empty())
return input;
// 定义结果列表
std::list<T> result;
result.splice(result.begin(), input, input.begin());
const T &pivot = *result.begin();
// 将input划分为两列
auto divide_point = std::partition(input.begin(), input.end(), [pivot](const T &it)
{ return it < pivot; });
std::list<T> lower_part;
// 将input的第一列取下,并插入到lower_part
lower_part.splice(lower_part.end(), input, input.begin(), divide_point);
// 递归调用,通过对象移动move转移对象的所有权,避免拷贝
auto new_lower(quick_sort(std::move(lower_part)));
auto new_higher(quick_sort(std::move(input)));
result.splice(result.begin(), new_lower);
result.splice(result.end(), new_higher);
return result;
}

// 并行的快速排序
template <typename T>
std::list<T> parallel_quick_sort(std::list<T> input, int depth = 0)
{
if (input.empty())
return input;
if (depth > 5)
{ // 控制递归深度以避免过多的线程开销
return quick_sort(std::move(input));
}
std::list<T> result;
result.splice(result.begin(), input, input.begin());
const T &pivot = *result.begin();
// 根据哨兵划分两侧
auto divide_point = std::partition(input.begin(), input.end(), [pivot](const T &it)
{ return it < pivot; });
// 将前一侧放到lower_part
std::list<T> lower_part;
lower_part.splice(lower_part.begin(), input, input.begin(), divide_point);
// 开启并发线程处理lower_part
std::future<std::list<T>> new_lower(std::async(&parallel_quick_sort<T>, std::move(lower_part), depth + 1));
auto new_higher(parallel_quick_sort(std::move(input), depth + 1));
result.splice(result.end(), new_higher);
result.splice(result.begin(), new_lower.get());
return result;
}

int main(){
std::ifstream file("random_numbers.txt");
std::list<int> list{};
std::string line;
while(std::getline(file, line)){
std::stringstream ss(line);
std::string num;
while(std::getline(ss, num, ' ')){
if(!num.empty())
list.push_back(std::stoi(num));
}
}
auto last = steady_clock::now();
auto list2 = quick_sort(list);
auto now = steady_clock::now();
std::cout << duration_cast<microseconds>((now - last)).count() << std::endl;
return 0;
}

原子操作

std::atomic_flag

最简单的原子类型标准, 表示一个布尔标志。具有两种状态:成立或置零。

必须由宏ATOMIC_FLAG_INIT初始化

clear():销毁

test_and_set:读取原有的值,并设置标志成立

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// 实现自旋锁
#include <iostream>
#include <thread>
#include <atomic>
#include <cassert>
class spinlock_mutex {
public:
spinlock_mutex():flag(ATOMIC_FLAG_INIT){}
void lock(){
while(flag.test_and_set(std::memory_order_acquire));
}
void unlock() { flag.clear(std::memory_order_release); }

private:
std::atomic_flag flag;
};
int main(int argc, char const *argv[])
{
spinlock_mutex spinlock;
std::thread t1([&spinlock]() {
spinlock.lock();
for (int i = 0; i < 3;++i){
std::cout << "*";
}
spinlock.unlock();
});
std::thread t2([&spinlock]() {
spinlock.lock();
for (int i = 0; i < 3;++i){
std::cout << "?";
}
spinlock.unlock();
});
t1.join();
t2.join();
return 0;
}

std::atomic<T>

is_lock_free()判断类型操作能由原子指令直接实现。

支持load()store()exchange()compare_exchange_weak()compare_exchange_strong()等操作。

内存次序

对原子类型上的每一种操作,都可以提供额外的参数,从枚举类std::memory_order取值

包括std::memory_order_relaxedstd:: memory_order_acquirestd::memory_order_consumestd::memory_order_acq_relstd::memory_order_releasestd::memory_order_seq_cst

  • 存储(store)操作,可选用的内存次序有std::memory_order_relaxedstd::memory_order_releasestd::memory_order_seq_cst

  • 载入(load)操作,可选用的内存次序有std::memory_order_relaxedstd::memory_order_consumestd::memory_order_acquirestd::memory_order_seq_cst

  • “读-改-写”(read-modify-write)操作,可选用的内存次序有std::memory_order_relaxedstd::memory_order_consumestd::memory_order_acquirestd::memory_order_releasestd::memory_order_acq_relstd::memory_order_seq_cst

原子操作默认使用的是std::memory_order_seq_cst先后一致次序。

设计并发的数据结构

线程安全的并发栈

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
template<typename T>
class ThreadSafeStack {
public:
ThreadSafeStack(){}
ThreadSafeStack(const ThreadSafeStack &t){
std::lock_guard<std::mutex> lock(m_mtx);
m_stack = t.m_stack;
}
ThreadSafeStack &operator=(const ThreadSafeStack &) = delete;
void push(const T &value){
std::lock_guard<std::mutex> lock(m_mtx);
m_stack.push(value);
// 有数据入栈就通知
m_cv.notify_one();
}
// wait版本的pop
void wait_and_pop(T &value){
std::unique_lock<std::mutex> lock(m_mtx);
// 如果栈不为空才允许pop,否则就等待
cv.wait(lock,[this](){
return !m_stack.empty();
});
// 因为front()返回的是左值,赋值会调用拷贝赋值,转成右值,调用移动赋值
value = std::move(m_stack.front());
m_stack.pop();
}
// 智能指针版本的返回
std::shared_ptr<T> wait_and_pop(){
std::unique_lock<std::mutex> lock(m_mtx);
cv.wait(lock, [this](){
return !m_stack.empty();
});
std::shared_ptr<T> res = std::make_shared<T>(std::move(m_stack.front()));
m_stack.pop();
return
}
// try版本的pop
bool try_pop(T &value){
std::lock_guard<std::mutex> lock(m_mtx);
if(m_stack.empty())
return false;
value = std::move(m_stack.front());
m_stack.pop();
return true;
}
std::shared_ptr<T> try_pop(){
std::lock_guard<std::mutex> lock(m_mtx);
if(m_stack.empty())
return std::shared_ptr<T>();
std::shared_ptr<T> res = std::make_shared<T>(std::move(m_stack.front()));
m_stack.pop();
return res;
}
bool empty() const {
std::lock_guard<std::mutex> lock(m);
return data.empty();
}
private:
std::stack<T> m_stack;
std::mutex m_mtx;
std::condition_variable m_cv;
};

并发队列

单向链表可以充当队列的最简单的数据结构:

image-20240620151604250
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
template<typename T>
class ThreadSafeQueue {
private:
struct node{
std::shared_ptr<T> data;
std::unique_ptr<node> next;
};
node* get_tail(){
std::lock_guard<std::mutex> lock(tail_mutex);
return tail;
}
std::unique_ptr<node> pop_head(){
std::unique_ptr<node> old_head = std::move(head);
head = std::move(old_head->next);
return old_head;
}
std::unique_lock<std::mutex> wait_for_data(){
std::unique_lock<std::mutex> head_lock(head_mutex);
m_cv.wait(head_lock, [&]() {
return head.get()!= get_tail();
});
return head_lock;
}
std::unique_ptr<node> wait_pop_head(){
std::unique_lock<std::mutex> head_lock(wait_for_data());
return pop_head();
}
std::unique_ptr<node> wait_pop_head(T &value){
std::unique_lock<std::mutex> head_lock(wait_for_data());
value = std::move(*head->data);
return pop_head;
}
std::unique_lock<node> try_pop_head(){
std::lock_guard<std::mutex> lock(head_mutex);
if(head.get() == tail){
return std::unique_lock<node>();
}
return pop_head();
}
std::unique_lock<node> try_pop_head(T &value){
std::lock_guard<std::mutex> head_lock(head_mutex);
if(head.get() == tail){
return std::unique_lock<node>();
}
value = std::move(*head->data);
return pop_head();
}
public:
ThreadSafeQueue(): head(new node), tail(head.get()){}
ThreadSafeQueue(const ThreadSafeQueue &) = delete;
ThreadSafeQueue &operator=(const ThreadSafeQueue &) = delete;
void push(T new_value){
// 构建指向数据的指针
std::shared_ptr<T> new_ptr = std::make_shared<T>(std::move(new_value));
// 创建新的虚结点
std::unique_ptr<node> p(new node);
{
std::lock_guard<std::mutex> lock(tail_mutex);
tail->data = new_ptr;
node *new_tail = p.get();
tail->next = std::move(p);
tail = new_tail;
}
m_cv.notify_one();
}

std::shared_ptr<T> wait_and_pop(){
const std::unique_ptr<node> old_head = wait_pop_head();
return old_head->data;
}
void wait_and_pop(T &value){
const std::unique_ptr<node> old_head = wait_and_pop(value);
}
std::shared_ptr<T> try_pop(){
std::unique_ptr<node> old_head = try_pop_head();
return old_head ? old_head->data : std::shared_ptr<T>();
}
bool try_pop(T &value){
const std::unique_lock<node> old_head = try_pop_head(value);
return old_head != nullptr;
}
bool empty(){
std::lock_guard<std::mutex> lock(head_mutex);
return (head.get() == tail);
}
private:
std::mutex head_mutex;
std::mutex tail_mutex;
std::unique_ptr<node> head;
node *tail;
std::condition_variable m_cv;
};
std::mutex read_mtx;
void printInt(const std::string con, int i){
std::lock_guard<std::mutex> lock(read_mtx);
std::cout <<con<<" : "<< i << std::endl;
}
int main(int argc, char const *argv[])
{

ThreadSafeQueue<int> safe_que;
std::thread consumer1([&]() {
while(true){
std::shared_ptr<int> data = safe_que.wait_and_pop();
printInt("consumer1", *data);
}
});
std::thread consumer2([&]() {
while(true){
std::shared_ptr<int> data = safe_que.wait_and_pop();
printInt("consumer2", *data);
}
});
std::thread producer([&]() {
for (int i = 0; i < 100;++i){
safe_que.push(i);
}
});
consumer1.join();
consumer2.join();
producer.join();
return 0;
}

设计并发代码线程安全原则

考虑一个操作是否是线程安全的,需要注意以下几个关键点:

1. 共享资源访问

如果多个线程访问同一个资源(如变量、对象、文件、数据库等),且至少有一个线程在修改该资源,那么这个访问需要是线程安全的。

2. 原子性

确保操作是原子的,即不能被中断。一个操作要么完全执行,要么完全不执行。原子操作通常由硬件支持(如处理器的原子指令)或通过锁实现。

3. 可见性

确保一个线程对数据的修改对其他线程是可见的。这可以通过适当的内存屏障或同步机制来实现。缺乏可见性可能会导致一个线程看到旧的数据,而不是另一个线程刚刚写入的数据。

4. 有序性

确保操作按预期顺序执行。在多线程环境中,编译器和处理器可能会重新排序指令以优化性能,这可能导致线程间的非预期行为。同步机制可以强制执行正确的顺序。

线程池

1、task任务队列表示

2、封装回调函数

3、空闲时挂起,有任务后通知

线程池的局限性:

1、不适合具有顺序执行的任务;

2、任务之间强相关或者互斥性较强;

简易的线程池模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
#include <iostream>
#include <thread>
#include <queue>
#include <condition_variable>
#include <mutex>
#include <string>
#include <vector>
#include <functional>
class ThreadPool{
public:
ThreadPool(int numThreads) : stop(false){
for (int i = 0; i < numThreads; ++i){
threads.emplace_back([this]{
while(1){
std::unique_lock<std::mutex> lock(mtx);
// 如果任务为或者stop为false就等待被唤醒
condition.wait(lock, [this]{ return !tasks.empty() || stop; });
if(stop && tasks.empty()){
return;
}
// 取任务
std::function<void()> task(std::move(tasks.front()));
tasks.pop();
// 解锁
lock.unlock();
// 执行任务
task();
} });
}
}
~ThreadPool(){
{
std::unique_lock<std::mutex> lock(mtx);
stop = true;
}
// 把任务队列任务取完
condition.notify_all();
// 等待任务完成
for(auto &t : threads){
t.join();
}
}
// 添加任务
template <class F, class... Args>
void enqueue(F &&f, Args &&...args){
std::function<void()> task = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
// 对tasks加锁
std::unique_lock<std::mutex> lock(mtx);
tasks.emplace(std::move(task));
lock.unlock();
// 通知
condition.notify_one();
}
private:
std::vector<std::thread> threads;
std::queue<std::function<void()>> tasks;
std::mutex mtx;
std::condition_variable condition;
bool stop;
};

int main(){
ThreadPool pool(4);
for (int i = 0; i < 10; ++i){
pool.enqueue([i]{
std::cout << "task : " << i << "is running" << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
std::cout << "task : " << i << "is done" << std::endl;
});
}
return 0;
}

线程池的单例模式实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
#include <iostream>
#include <thread>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <future>
#include <atomic>
#include <vector>
#include <functional>
#include <memory>

class ThreadPool {
using Task = std::packaged_task<void()>;

public:
ThreadPool(const ThreadPool &) = delete;
ThreadPool &operator=(ThreadPool &) = delete;
static ThreadPool& GetInstance(){
static ThreadPool t;
return t;
}
// 向任务队列中添加任务
template<typename T, typename ...Args>
auto commit(T func, Args&& ...args) -> std::future<decltype(func(args...))>{
using ReturnType = decltype(func(args...));
// 判断线程池是否停止
if(m_stop.load())
return std::future<ReturnType>();
// 创建一个任务
auto task = std::make_shared<std::packaged_task<ReturnType()>>(
std::bind(std::forward<T>(func), std::forward<Args>(args)...));
std::future<ReturnType> ret = task->get_future();
{
std::lock_guard<std::mutex> lock(m_mtx);
m_tasks.emplace([task]{
(*task)();
});
}
m_cv.notify_one();
return ret;
}
private:
// 单例模式下的初始构造函数必须是private
ThreadPool(unsigned int num = 4) : m_stop(false){
// 确定启动线程数
if(num < 1)
m_thread_num = 1;
else
m_thread_num = num;
// 初始化线程
init_thread();
}
void init_thread(){
for (int i = 0; i < m_thread_num;++i){
m_thread_pool.emplace_back([this] {
while(!this->m_stop.load()){
Task task;
{
std::unique_lock<std::mutex> lock(m_mtx);
// 线程挂起,等待任务
// std::cout << " thread is waiting: " << std::this_thread::get_id() << std::endl;
m_cv.wait(lock, [this]{
// 唤醒时直到线程池结束,或者任务池中有任务取消挂起
return m_stop.load() || !m_tasks.empty();
});
// 再次判断任务队列是否为空,看是否是stop导致的唤醒
if(m_tasks.empty()){
return;
}
// 取出任务
task = std::move(m_tasks.front());
m_tasks.pop();
}
// 执行任务
--m_thread_num;
std::cout << "thread " << std::this_thread::get_id() << "is getting task : " << std::endl;
task();
++m_thread_num;
}
});
}
}
~ThreadPool(){
// 不允许执行新的任务
m_stop = true;
// 等待正在执行的所有任务
m_cv.notify_all();
for (auto &it: m_thread_pool){
if(it.joinable()){
std::cout << "join thread" << it.get_id() << std::endl;
it.join();
}
}
}
private:
std::queue<std::packaged_task<void()>> m_tasks; // 任务队列
std::vector<std::thread> m_thread_pool; // 线程池
std::mutex m_mtx; // 队列锁
std::condition_variable m_cv; // 通知线程池取任务条件变量
std::atomic_bool m_stop; // 线程结束标志
std::atomic_int m_thread_num;
};

int main(int argc, char const *argv[])
{
int num = 0;
std::this_thread::sleep_for(std::chrono::seconds(2));
ThreadPool::GetInstance().commit([](int &num){
num = 10;
std::cout << "into thread num :" << num << std::endl;

}, std::ref(num));
std::this_thread::sleep_for(std::chrono::seconds(2));
std::cout << "num is :" << num << std::endl;
return 0;
}

实战

一个异步日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
#include <iostream>
#include <string>
#include <queue>
#include <any>
#include <sstream>
#include <memory>
#include <condition_variable>
#include <thread>
#include <chrono>
#include <ctime>
#include <iomanip>
enum LogLevel {
DEBUG = 0,
INFO = 1,
WARNING = 2,
ERROR = 3
};
static void currentTime(){
auto now = std::chrono::system_clock::now();
std::time_t now_time_t = std::chrono::system_clock::to_time_t(now);
std::tm *now_tm = std::localtime(&now_time_t);
std::cout << std::put_time(now_tm, "[%y-%m-%d %H:%M:%S]");
}
class LogTask{
public:
LogTask(){}
LogTask(const LogTask &log)
: m_level(log.m_level), m_logData(log.m_logData){};
LogTask(const LogTask &&log)
: m_level(log.m_level), m_logData(std::move(log.m_logData)){};
public:
LogLevel m_level; // 记录日志等级c
std::queue<std::any> m_logData; // 日志数据
};

class AsyncLogger{
private:
AsyncLogger():m_stop(false){
m_logThread = std::thread([this] () {
while(true){
// 不断的从日志队列中取消息打印,如果没有消息就挂起
std::unique_lock<std::mutex> lock(m_mtx);
m_cv.wait(lock, [this]() {
return !m_logQueue.empty() || m_stop;
});
// 停止就取消消息循环,同时也要保证消息队列为空
if(m_stop && m_logQueue.empty())
return;
// 否则取消息
auto logMsg = m_logQueue.front();
m_logQueue.pop();
lock.unlock();
// 打印消息
printLogTask(logMsg);
}
});
}
AsyncLogger(const AsyncLogger &) = delete;
AsyncLogger &operator=(const AsyncLogger &) = delete;
~AsyncLogger(){
m_stop = true;
m_cv.notify_all();
if(m_logThread.joinable())
m_logThread.join();
currentTime();
std::cout << " Exit Logger Success!";
}
// 打印日志函数
void printLogTask(std::shared_ptr<LogTask> logTaskPtr){
currentTime();
std::cout << "[" << logLevelToString(logTaskPtr->m_level) << "]";
// 从logTask中的queue取消息打印
while(!logTaskPtr->m_logData.empty()){
try{
// 将any转化为string类型
std::cout<<convertToString(logTaskPtr->m_logData.front());
}catch(const std::bad_any_cast&){
// 处理不能转化的变量类型异常
std::cout << "(Invalid Log Data Type)";
}
logTaskPtr->m_logData.pop();
}
std::cout << std::endl;
}
// 转换any类型数据
std::string convertToString(const std::any &data){
// 只处理三种类型, 其余情况抛出异常
std::ostringstream oss;
if(data.type() == typeid(int))
oss << std::any_cast<int>(data);
else if(data.type() == typeid(std::string))
oss << std::any_cast<std::string>(data);
else if(data.type() == typeid(const char*))
oss << std::any_cast<const char *>(data);
else
throw std::bad_any_cast();
return oss.str();
}
// 日志等级到字符串映射
const char* logLevelToString(LogLevel level){
switch(level){
case DEBUG:
return "DEBUG";
case INFO:
return "INFO";
case WARNING:
return "WARNING";
case ERROR:
return "ERROR";
default:
return "UNKNOW TYPE";
}
}
public:
// 单例模式
static AsyncLogger& GetInstance(){
static AsyncLogger log;
return log;
}
// 写异步日志
template<typename... Args>
void AsyncWrite(LogLevel logLevel, Args&&... args){
std::shared_ptr<LogTask> task = std::make_shared<LogTask>();
task->m_level = logLevel;
// 利用折叠表达式将消息入队
(task->m_logData.push(args), ...);
std::unique_lock<std::mutex> lock(m_mtx);
bool notify = m_logQueue.empty();
m_logQueue.push(task);
// 判断是否是第一个消息
lock.unlock();
if(notify){
m_cv.notify_one();
}
}
// 写日志的特化版本
template<typename... Args>
void AsyncWriteInfo(Args&&...args){
AsyncWrite(INFO, std::forward<Args>(args)...);
}
template<typename... Args>
void AsyncWriteError(Args&&...args){
AsyncWrite(ERROR, std::forward<Args>(args)...);
}
template<typename... Args>
void AsyncWriteDebug(Args&&...args){
AsyncWrite(DEBUG, std::forward<Args>(args)...);
}
template<typename... Args>
void AsyncWriteWarning(Args&&...args){
AsyncWrite(WARNING, std::forward<Args>(args)...);
}
private:
std::queue<std::shared_ptr<LogTask>> m_logQueue; // 日志队列
std::condition_variable m_cv;
std::mutex m_mtx; // 互斥访问日志队列
bool m_stop; // 停止
std::thread m_logThread; // 日志工作线程
};

void func_error(){
for(int i = 0 ;i<100;++i){
AsyncLogger::GetInstance().AsyncWriteError(i);
}
}
void func_info(){
for(int i = 0;i<100;++i){
AsyncLogger::GetInstance().AsyncWriteInfo(i);
}
}

int main(int argc, char const *argv[])
{
AsyncLogger::GetInstance().AsyncWrite(LogLevel::INFO, 3.12, "HELLO", 321);
AsyncLogger::GetInstance().AsyncWrite(LogLevel::ERROR, "INPUT ERROR");
AsyncLogger::GetInstance().AsyncWriteError("haha this is a error", 132.123);
std::thread t1(func_error);
std::thread t2(func_info);
t1.join();
t2.join();
return 0;
}

C++并发编程
http://example.com/2024/07/31/concurrency/
作者
John Doe
发布于
2024年7月31日
许可协议