《总之,好记性不如烂笔头!把你遗忘的都记下来吧!》
创建线程
std::thread
函数可以是普通函数,成员函数,函数对象,lambda表达式
普通函数
1
| std::thread(func, args...)
|
1 2 3 4 5 6 7 8 9
| void printHelloWorld(const std::string &msg){ std::cout<<msg;
} int main(){ std::thread thread1(printHelloWorld, "hello world"); thread1.join(); return 0; }
|
类的非静态成员函数
1 2
| A a; std::thread t(&A::func, &a);
|
支持拷贝操作
1、 join() 阻塞主线程,等待子线程完成
**joinable()**判断对象能够汇合,如果不能汇合将返回false,如果对已经汇合过的线程调用join()将产生错误行为。
2、detach() 将子线程和主线程分离
如果等到std::thread对象销毁还没决定join()/detach() ,std::thread的析构函数将调用**std::terminate()**终止整个程序。
获取线程ID
线程ID所属的型别为std::thread::id, 可以通过调用std::this_thread::get_id()获得。
std::thread源码剖析
数据未定义错误/线程传递参数问题
1、传入函数对象临时变量导致构造函数语法与声明函数一致
如:std::thread my_thread(background_task()),此时将my_thread当作函数指针
解决方法:
1 2 3 4
| std::thread t((background_task()));
std::thread t{background_task()};
|
2、传入临时变量,但形参为引用
临时变量被传入参数后销毁,形参的引用导致了未定义的行为。
解决方法:
- 通过智能指针传递参数,因为引用计数会随着赋值增加,保证变量使用期间不被释放,叫做:伪闭包策略
- 将局部变量的值作为参数传递,但拷贝消耗时间空间
3、传递指针或者引用指向已经释放的内存
4、入口函数为类的私有成员函数
解决方法:将私有成员函数变为友元函数
5、移交线程归属权
不能将一个线程的归属权交给一个已经绑定线程的变量
在异常的情况下等待线程完结
当主线程发生异常时,可能会导致子线程的join()未执行,从而导致子线程不能执行完全。
1 2 3 4 5 6 7 8 9 10 11
| void f(){ std::thread t(my_func); try { do_something_in_current_thread(); }catch(...){ t.join(); throw; } t.join(); }
|
通过RAII过程等待线程完结
1 2 3 4 5 6 7 8 9 10 11 12 13
| class thread_guard { public: explicit thread_guard(std::thread &_t):t(_t){} thread_guard(const thread_guard &) = delete; thread_guard &operator=(const thread_guard &) = delete; ~thread_guard(){ if(t.joinable()){ t.join(); } } private: std::thread &t; };
|
控制运行的线程数量
函数**std::thread::hardware_concurrency()**表示程序在各次运行中可真正并发的线程数量。在多核系统上,该值可能就是CPU的核芯数量。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| #include <iostream> #include <thread> #include <algorithm> #include <numeric> #include <vector> #include <chrono> #include <exception> struct empty_stack : public std::exception{ const char *what() const throw(); }; template<typename Iterator, typename T> struct accumlate_block{ void operator()(Iterator first, Iterator last, T& result){ result = std::accumulate(first, last, result); } };
template<typename Iterator, typename T> T parallel_accumulate(Iterator first, Iterator last, T init){ auto length = std::distance(first, last); if(!length) return init; const long min_per_thread = 25; const long max_threads = (length + min_per_thread - 1) / min_per_thread; const long hardware_threads = std::thread::hardware_concurrency(); const long num_threads = std::min(hardware_threads != 0 ? hardware_threads : 2, max_threads); const long block_size = length / num_threads; std::vector<T> results(num_threads); std::vector<std::thread> threads(num_threads - 1); Iterator block_start = first; for (unsigned long i = 0; i < (num_threads - 1);++i){ Iterator block_end = block_start; std::advance(block_end, block_size); threads[i] = std::thread( accumlate_block<Iterator, T>(), block_start, block_end, std::ref(results[i])); block_start = block_end; } accumlate_block<Iterator, T>()(block_start, last, results[num_threads - 1]); for(auto& entry: threads){ entry.join(); } return std::accumulate(results.begin(), results.end(), init); }
int main(int argc, char const *argv[]) { std::vector<int> a; for (int i = 0; i < 10000000;++i){ a.push_back(1); } auto beforeTime = std::chrono::steady_clock::now(); parallel_accumulate(a.begin(), a.end(), 0); auto afterTime = std::chrono::steady_clock::now(); std::cout << std::chrono::duration<double>(afterTime - beforeTime).count() << std::endl; beforeTime = std::chrono::steady_clock::now(); std::accumulate(a.begin(), a.end(), 0); afterTime = std::chrono::steady_clock::now(); std::cout << std::chrono::duration<double>(afterTime - beforeTime).count() << std::endl; beforeTime = std::chrono::steady_clock::now(); std::reduce(a.begin(), a.end(), 0); afterTime = std::chrono::steady_clock::now(); std::cout << std::chrono::duration<double>(afterTime - beforeTime).count() << std::endl; return 0; }
|
存储std::thread容器
使用容器存储线程,如std::vector,由于std::thread没有拷贝构造函数使用push_back必须显示构造一个临时std::thread变量,然后利用移动构造函数移入容器。
而使用emplace_back可以直接在容器内部构造一个std::thread,能够减少一次移动构造。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class my_thread{ public: void operator()()const{ } }; int main(){ std::vector<std::thread> vec; std::thread t1((my_thread())); t1.join(); vec.push_back(std::thread(my_thread())); vec.emplace_back((my_thread())); for(auto &t:vec){ t.join(); } return 0; }
|
std::jthread
自动汇合(join on destruction):
std::jthread 在析构时自动调用 join(),确保线程在对象销毁前完成。这可以避免常见的资源泄漏问题或未处理的线程终止问题。
- 对比之下,
std::thread 在析构时如果线程仍然可运行(joinable),则会调用 std::terminate()。
传递停止令牌(stop token):
std::jthread 提供了一种机制,可以传递一个 std::stop_token 给线程函数,用于请求线程停止。这是通过 std::jthread 构造函数的重载实现的,它会创建一个 std::stop_source 和一个 std::stop_token,并将 std::stop_token 传递给线程函数。
- 这使得线程管理和停止控制更加方便和安全。
更好的 RAII 支持:
- 由于
std::jthread 自动管理线程的生命周期(自动 join),它更符合 RAII(资源获取即初始化)原则,降低了出错的概率。
数据共享
std::mutex
利用了临界区的概念,创建了一个互斥的代码块。创建互斥锁
1、lock()加锁
2、unlock()解锁
std::lock
同时对两个锁进行加锁
std::timed_mutex
支持延迟加锁的互斥锁,当使用延迟加锁时,就使用这个锁来控制互斥量。
std::lock_guard
构造时加锁系,析构时解锁,只在局部作用域中使用
1 2 3 4 5 6 7 8 9 10
| while(true){ { std::lock_guard<std::mutex> lock(mtx); --shared_data; std::cout << "current thread is " << std::this_thread::get_id() << std::endl; std::cout << "shared_data is " << shared_data << std ::endl; } std::this_thread::sleep_for(std::chrono::microseconds(1000)); } });
|
传递第二个参数为std::adopt_lock表示获取锁之前已经加锁了, 锁是领取过来的。通过std::lock_gruard就可以管理这个领取过来的锁了。
1 2 3 4 5
| std::lock(mtx1, mtx2);
std::lock_guard<std::mutex> guard1(mtx1, std::adopt_lock); std::lock_guard<std::mutex> guard2(mtx2, std::adopt_lock)
|
std::scope_lock c++17
提供了另一种同时对多个锁进行加锁并控制的操作
1
| std::scope_lock lock(mtx1, mtx2);
|
std::unique_lock
对互斥量的灵活加锁,构造时自动加锁,析构时解锁。
传递第二个参数std::adopt_lock指明对象管理互斥上的锁;
std::defer_lock使互斥在完成构造时为无锁状态,延迟加锁。
成员函数:
lock()自己加锁;
unlock()自己解锁;
owns_lock() 判断是否占有了锁;
try_lock_for()等待一段时间,如果获取不到锁就结束等待;
try_lock_until()等待到某个时间, 如果获取不到锁就结束等待;
std::shared_mutex c++17
提供了 lock(), try_lock(), 和 try_lock_for() 以及 try_lock_until() 函数,这些函数都可以用于获取互斥锁。
提供了 try_lock_shared() 和 lock_shared() 函数,这些函数可以用于获取共享锁。
当 std::shared_mutex 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁。
使用std::shared_lock<>来构造控制共享锁, 独占的锁就由std::lock_guard或std::unique_lock控制
读操作需要使用共享锁的原因:
1、在读操作的过程中需要确保没有写操作,防止读入不完整的数据;
2、读操作之间是可以同时进行的;
std::shared_time_mutex c++14
- 与
std::shared_mutex 类似,也提供了 lock(), try_lock(), 和 try_lock_for() 以及 try_lock_until() 函数用于获取互斥锁。
- 与
std::shared_mutex 不同的是,它还提供了 try_lock_shared() 和 lock_shared() 函数用于获取共享锁,这些函数在尝试获取共享锁时具有超时机制。
- 当
std::shared_timed_mutex 被锁定后,其他尝试获取该锁的线程将会被阻塞,直到该锁被解锁,这与 std::shared_mutex 相同。然而,当尝试获取共享锁时,如果不能立即获得锁,std::shared_timed_mutex 会设置一个超时,超时过后如果仍然没有获取到锁,则操作将返回失败。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| #include <iostream> #include <thread> #include <mutex> #include <map> #include <shared_mutex> class DNSService { public: DNSService(){} std::string QueryDns(std::string dnsName){ std::shared_lock<std::shared_mutex> lock(m_shared_mtx); auto iter = dns_info.find(dnsName); if(iter != dns_info.end()){ return iter->second; } return ""; } void AddDnsInfo(std::string dsnName, std::string dsnEntry){ std::lock_guard<std::shared_mutex> lock(m_shared_mtx); dns_info.insert(std::make_pair(dsnName, dsnEntry)); }
private: std::map<std::string, std::string> dns_info; mutable std::shared_mutex m_shared_mtx; };
int main(int argc, char const *argv[]) { DNSService dns; std::thread t1([&dns]() { dns.AddDnsInfo("www.haha.com", "127.0.0.1"); }); std::thread t2( [&dns]() { std::cout << dns.QueryDns("www.haha.com") << std::endl; });
t1.join(); t2.join(); return 0; }
|
递归锁
在接口内部加锁,并调用另一个加锁的接口导致循环加锁而卡死,可以使用递归锁来来避免。
std::recursive_mutex,但是并不推荐使用这种操作,应该从设计上避免递归加锁。
线程安全的栈
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| template<typename T> class safe_stack{ public: safe_stack(){} safe_stack(const safe_stack & other){ std::lock_guard<std::mutex> lock(other.mtx); stk = other.stk; } safe_stack &operator=(const safe_stack &) = delete; void push(T new_value){ std::lock_guard<std::mutex> lock(this->mtx); stk.push(std::move(new_value)); } std::shared_ptr<T> pop(){ std::lock_guard<std::mutex> lock(this->mtx); if(stk.empty()) std::cerr << "pop error"; std::shared_ptr<T> ptr = std::make_shared<T>(stk.top()); stk.pop(); return ptr; }
void pop(T& value){ std::lock_guard<std::mutex> lock(this->mtx); if(stk.empty()) std::cerr << "pop error"; value = stk.top(); stk.pop(); }
private: std::mutex mtx; std::stack<T> stk;
};
|
线程安全的单例模式
局部静态变量实现单例
1 2 3 4 5 6 7 8 9 10 11 12 13
| class SingleInstance{ private: SingleInstance(){} SingleInstance(const SingleInstance &) = delete; SingleInstance& operator=(SingleInstance &) = delete;
public: static SingleInstance& GetInstance(){ static SingleInstance single; return single; } };
|
饿汉模式的单例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class SingleHungryInstance { private: SingleHungryInstance(){} SingleHungryInstance(const SingleHungryInstance &) = delete; SingleHungryInstance &operator=(SingleHungryInstance &) = delete; public: static SingleHungryInstance* GetInstance(){ if(single == nullptr){ single = new SingleHungryInstance; } return single; } private: static SingleHungryInstance *single; };
SingleHungryInstance* SingleHungryInstance::single = new SingleHungryInstance;
|
懒汉模式的单例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| class SingleLazyInstance { private: SingleLazyInstance(){} SingleLazyInstance(const SingleLazyInstance &) = delete; SingleLazyInstance &operator=(SingleLazyInstance &) = delete; public: static SingleLazyInstance* GetInstance(){ if(single != nullptr){ return single; } m_mtx.lock(); if(single != nullptr){ m_mtx.unlock(); return single; } single = new SingleLazyInstance(); m_mtx.unlock(); return single; }
private: static SingleLazyInstance *single; static std::mutex m_mtx; };
SingleLazyInstance *SingleLazyInstance::single = nullptr; std::mutex SingleLazyInstance::m_mtx;
|
std::call_once的单例
在单例模式下使用,std::once_flag实例对应一次不同的初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| class SingleLazyInstance2 { private: SingleLazyInstance2(){} SingleLazyInstance2(const SingleLazyInstance2 &) = delete; SingleLazyInstance2 &operator=(SingleLazyInstance2 &) = delete; public: static SingleLazyInstance2* GetInstance(){ std::call_once(initFlag, []() { single = new SingleLazyInstance2; }); return single; } private: static SingleLazyInstance2 *single; static std::once_flag initFlag; };
SingleLazyInstance2 *SingleLazyInstance2::single = nullptr; std::once_flag SingleLazyInstance2::initFlag;
|
使用std::call_once实现的单例模板类
// TODO
死锁
防范死锁的准则
如果已经持有了锁,就不要试图获取第二个锁。如果要获取多个锁,应该采用std::lock()/std::scope_lock,通过一次动作全部获取所有锁。
线程间传递变量
1、通过共享变量
2、通过参数传递
3、std::promise
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| #include <iostream> #include <future>
void func(std::promise<int> &f){ f.set_value(1000); }
int main(int argc, char const *argv[]) { std::promise<int> f; std::future<int> future_res = f.get_future(); std::thread t1(func, std::ref(f)); t1.join(); std::cout << future_res.get() << std::endl; return 0; }
|
并发的同步
std::condition_variable条件变量
成员函数:
void wait( std::unique_lock< std::mutex> >& lock, Predicate pred );
wait导致当前线程被阻塞直到条件变量被通知,或者虚假唤醒发生,等待期间将解锁互斥。
虚假唤醒:线程重新获得互斥,并查验条件,而这一行为却不是通过另一个线程的通知。(可能是操作系统做出的唤醒)
1 2 3 4
| While(!pred()) wait(lock);
|
当谓词返回false时,线程进入阻塞或等待状态,解锁互斥;
当谓词返回true时,线程从wait()中返回,互斥仍被锁住。
wait_for()
阻塞当前线程,直到条件变量被唤醒,或者到达指定时长
wait_until()
阻塞当前线程,直到条件变量被唤醒,或者到指定时间点
notify_one()
通知一个等待的线程
notify_all()
通知所有等待的线程
使用条件变量循环打印消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <chrono> std::mutex mtx; std::condition_variable cv_a; std::condition_variable cv_b; int num = 1; void do_printA(int &num){ while(true){ std::unique_lock<std::mutex> lock(mtx); cv_a.wait(lock, [&num](){ return num == 1; });
++num; std::cout << "current thread num : " << num << std::endl; lock.unlock(); cv_b.notify_one(); } } void do_printB(int &num){ while(true){ std::unique_lock<std::mutex> lock(mtx); cv_b.wait(lock, [&num](){ return num == 2; });
--num; lock.unlock(); std::cout << "current thread num : " << num << std::endl; cv_a.notify_one(); } } int main(int argc, char const *argv[]) { std::thread t1(do_printA, std::ref(num)); std::thread t2(do_printB, std::ref(num)); t1.join(); t2.join(); return 0; }
|
使用条件变量实现生产者消费者模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #include <iostream> #include <thread> #include <mutex> #include <condition_variable> #include <queue> std::mutex g_mutex; std::condition_variable g_cv; std::queue<int> g_queue;
void Producer() { for (int i = 0; i < 10; i++) { { std::unique_lock<std::mutex> lock(g_mutex); g_queue.push(i); std::cout << "Producer: produced " << i << std::endl; } g_cv.notify_one(); std::this_thread::sleep_for(std::chrono::milliseconds(100)); } } void Consumer() { while (true) { std::unique_lock<std::mutex> lock(g_mutex); g_cv.wait(lock, []() { return !g_queue.empty(); }); int value = g_queue.front(); g_queue.pop(); std::cout << "Consumer: consumed " << value << std::endl; } } int main() { std::thread producer_thread(Producer); std::thread consumer_thread(Consumer); producer_thread.join(); consumer_thread.join(); return 0; }
|
条件变量实现线程安全队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| #include <iostream> #include <queue> #include <mutex> #include <condition_variable> #include <thread>
template<typename T> class ThreadSafeQueue{ public: ThreadSafeQueue(){} ThreadSafeQueue(const ThreadSafeQueue& t){ std::lock_guard<std::mutex> lock(m_mtx); m_queue = t.m_queue; } ThreadSafeQueue &operator=(ThreadSafeQueue &) = delete; void push(T new_value){ std::lock_guard<std::mutex> lock(m_mtx); m_queue.push(new_value); m_cv.notify_one(); } void wait_and_pop(T &value){ std::unique_lock<std::mutex> lock(m_mtx); m_cv.wait(lock, [this]() { return !m_queue.empty(); }); value = m_queue.front(); m_queue.pop(); } std::shared_ptr<T> wait_and_pop(){ std::unique_lock<std::mutex> lock(m_mtx); m_cv.wait(lock, [this]() { return !m_queue.empty(); }); auto ptr = std::make_shared<T>(m_queue.front()); m_queue.pop(); return ptr; } bool try_pop(T &value){ std::lock_guard<std::mutex> lock(m_mtx); if(m_queue.empty()) return false; value = m_queue.front(); m_queue.pop(); return true; } std::shared_ptr<T> try_pop(){ std::lock_guard<std::mutex> lock(m_mtx); if(m_queue.empty()) return std::make_shared<T>(NULL); auto ptr = std::make_shared<T>(m_queue.front()); m_queue.pop(); return ptr; }
bool empty() const{ std::lock_guard<std::mutex> lock(m_mtx); return m_queue.empty(); }
private: mutable std::mutex m_mtx; std::queue<T> m_queue; std::condition_variable m_cv; };
int main(int argc, char const *argv[]) { ThreadSafeQueue<int> queue; std::thread t1([&queue]() { for (int i = 0; ;++i) { queue.push(i); std::cout << "producer push:" << i << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(200)); } }); std::thread consumer1([&queue]() { while(true){ auto ptr = queue.wait_and_pop(); std::cout << "consumer1 get:" << *ptr.get() << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(500)); } }); std::thread consumer2([&queue]() { while(true){ int ret; queue.try_pop(ret); std::cout << "consumer2 get:" << ret << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(500)); } }); t1.join(); consumer1.join(); consumer2.join(); return 0; }
|
异步并发
std::async
std::async使用异步的方式启动任务,从其返回值中获得std::future对象。默认情况下编译器会根据当前线程情况自动选择是否开启专属的线程执行任务。
不需要管理线程的生命周期。
可以指定第一个参数:std::lauch
std::lauch::deferred 在当前线程上延后调用任务函数,直到遇到在futrue上get/wait才会执行任务函数;
std::lauch::async 开启专属的线程,在新线程上运行任务函数;
默认情况下,使用std::lauch::deferred | std::lauch::async策略,由编译器选择执行情况
std::future
调用get()时,当前线程会阻塞,以便future准备妥当并返回值或者异常。get()只能被调用一次,因为它会移动或者消耗掉std::future对象的状态;
wait()也是一个阻塞调用,wait()不会返回任务结果,它只是等待异步任务完成。如果任务完成,wait()会立即返回,如果任务没有完成wait()会阻塞当前线程,直到任务完成。wait()可以被调用多次。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| #include <iostream> #include <future> #include <thread> int find_the_answer_to_ltuae(){ int a = 0; for (int i = 0; i < 100;++i){ ++a; } return a; } int main(){ std::future<int> future_result = std::async(std::launch::async, find_the_answer_to_ltuae); std::cout << find_the_answer_to_ltuae() << std::endl; std::cout << future_result.get() << std::endl;
|
std::packaged_task
连接future对象和可调用函数。
std::packaged_task是一个类模板,其模板参数是函数签名。例如:<void()> 代表了一个不接受参数和没有返回值的函数。
1、std::package_task<T> task (func)其中func是被包装的任务(函数),模板类型为函数的返回值;
2、通过std::future<T> f = task.get_future()获取与任务关联的future对象。
3、再通过f.get()来获取任务的返回值。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| #include <iostream> #include <future> #include <thread> int find_the_answer_to_ltuae(){ int a = 0; for (int i = 0; i < 100;++i){ ++a; } return a; } int main(){ std::packaged_task<int()> task(find_the_answer_to_ltuae); auto future_packged_result = task.get_future(); std::thread t1(std::move(task)); t1.join(); std::cout << future_packged_result.get() << std::endl; return 0; }
|
std::promise
std::promise用于在某一线程中设置某个值或异常,而std::future则用于在另一线程中获取这个值或异常。
set_value()设置值,在主线程通过std::future::get()获取设置的值,如果值未设置,将阻塞。
set_exception()设置异常, 设置的异常通过std::current_exception()获取。子线程设置的异常,主线程必须捕获这个设置的异常。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| #include <iostream> #include <future> #include <string>
void set_execption(std::promise<void> pro){ try{ throw std::runtime_error("An error occured"); }catch(...){ pro.set_exception(std::current_exception()); } }
int main(int argc, char const *argv[]) { std::promise<std::string> p; std::future<std::string> f = p.get_future(); std::thread t1([](std::promise<std::string> p) { p.set_value("hello world"); }, std::move(p)); std::cout << f.get() << std::endl; t1.join(); std::promise<void> promiseException; std::future<void> future = promiseException.get_future(); std::thread t2(set_execption, std::move(promiseException)); try { std::cout << "waiting for set exception" << std::endl; future.get(); }catch(const std::exception& e){ std::cout << "catch :" << e.what() << std::endl; } t2.join(); return 0; }
|
std::shared_future
当多个线程需要等待同一个执行结果时,可以使用std::shared_future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| #include <iostream> #include <future> #include <thread> int main(int argc, char const *argv[]) { std::promise<int> promise; std::shared_future<int> shared_f = promise.get_future(); std::thread t1([](std::promise<int> &&p){ std::this_thread::sleep_for(std::chrono::seconds(2)); p.set_value(10); }, std::move(promise)); std::thread t2([shared_f](){ int m = shared_f.get(); std::cout << std::this_thread::get_id() << " num is" << m + 1 << std::endl; }); std::thread t3([shared_f](){ int m = shared_f.get(); std::cout << std::this_thread::get_id() << " num is" << m - 5 << std::endl; }); t1.join(); t2.join(); t3.join(); return 0; }
|
利用future进行函数式编程
函数式编程风格:把运算过程尽量写成一系列嵌套的函数调用。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| #include <iostream> #include <algorithm> #include <list> #include <future> #include <chrono> #include <fstream> #include <string> #include <sstream> using namespace std::chrono;
template<typename T> std::list<T> quick_sort(std::list<T> input){ if(input.empty()) return input; std::list<T> result; result.splice(result.begin(), input, input.begin()); const T &pivot = *result.begin(); auto divide_point = std::partition(input.begin(), input.end(), [pivot](const T &it) { return it < pivot; }); std::list<T> lower_part; lower_part.splice(lower_part.end(), input, input.begin(), divide_point); auto new_lower(quick_sort(std::move(lower_part))); auto new_higher(quick_sort(std::move(input))); result.splice(result.begin(), new_lower); result.splice(result.end(), new_higher); return result; }
template <typename T> std::list<T> parallel_quick_sort(std::list<T> input, int depth = 0) { if (input.empty()) return input; if (depth > 5) { return quick_sort(std::move(input)); } std::list<T> result; result.splice(result.begin(), input, input.begin()); const T &pivot = *result.begin(); auto divide_point = std::partition(input.begin(), input.end(), [pivot](const T &it) { return it < pivot; }); std::list<T> lower_part; lower_part.splice(lower_part.begin(), input, input.begin(), divide_point); std::future<std::list<T>> new_lower(std::async(¶llel_quick_sort<T>, std::move(lower_part), depth + 1)); auto new_higher(parallel_quick_sort(std::move(input), depth + 1)); result.splice(result.end(), new_higher); result.splice(result.begin(), new_lower.get()); return result; }
int main(){ std::ifstream file("random_numbers.txt"); std::list<int> list{}; std::string line; while(std::getline(file, line)){ std::stringstream ss(line); std::string num; while(std::getline(ss, num, ' ')){ if(!num.empty()) list.push_back(std::stoi(num)); } } auto last = steady_clock::now(); auto list2 = quick_sort(list); auto now = steady_clock::now(); std::cout << duration_cast<microseconds>((now - last)).count() << std::endl; return 0; }
|
原子操作
std::atomic_flag
最简单的原子类型标准, 表示一个布尔标志。具有两种状态:成立或置零。
必须由宏ATOMIC_FLAG_INIT初始化
clear():销毁
test_and_set:读取原有的值,并设置标志成立
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| #include <iostream> #include <thread> #include <atomic> #include <cassert> class spinlock_mutex { public: spinlock_mutex():flag(ATOMIC_FLAG_INIT){} void lock(){ while(flag.test_and_set(std::memory_order_acquire)); } void unlock() { flag.clear(std::memory_order_release); }
private: std::atomic_flag flag; }; int main(int argc, char const *argv[]) { spinlock_mutex spinlock; std::thread t1([&spinlock]() { spinlock.lock(); for (int i = 0; i < 3;++i){ std::cout << "*"; } spinlock.unlock(); }); std::thread t2([&spinlock]() { spinlock.lock(); for (int i = 0; i < 3;++i){ std::cout << "?"; } spinlock.unlock(); }); t1.join(); t2.join(); return 0; }
|
std::atomic<T>
is_lock_free()判断类型操作能由原子指令直接实现。
支持load()和store()、exchange()、compare_exchange_weak()和compare_exchange_strong()等操作。
内存次序
对原子类型上的每一种操作,都可以提供额外的参数,从枚举类std::memory_order取值
包括std::memory_order_relaxed、std:: memory_order_acquire、std::memory_order_consume、std::memory_order_acq_rel、std::memory_order_release和 std::memory_order_seq_cst。
存储(store)操作,可选用的内存次序有std::memory_order_relaxed、std::memory_order_release或std::memory_order_seq_cst。
载入(load)操作,可选用的内存次序有std::memory_order_relaxed、std::memory_order_consume、std::memory_order_acquire或std::memory_order_seq_cst。
“读-改-写”(read-modify-write)操作,可选用的内存次序有std::memory_order_relaxed、std::memory_order_consume、std::memory_order_acquire、std::memory_order_release、std::memory_order_acq_rel或std::memory_order_seq_cst。
原子操作默认使用的是std::memory_order_seq_cst先后一致次序。
设计并发的数据结构
线程安全的并发栈
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| template<typename T> class ThreadSafeStack { public: ThreadSafeStack(){} ThreadSafeStack(const ThreadSafeStack &t){ std::lock_guard<std::mutex> lock(m_mtx); m_stack = t.m_stack; } ThreadSafeStack &operator=(const ThreadSafeStack &) = delete; void push(const T &value){ std::lock_guard<std::mutex> lock(m_mtx); m_stack.push(value); m_cv.notify_one(); } void wait_and_pop(T &value){ std::unique_lock<std::mutex> lock(m_mtx); cv.wait(lock,[this](){ return !m_stack.empty(); }); value = std::move(m_stack.front()); m_stack.pop(); } std::shared_ptr<T> wait_and_pop(){ std::unique_lock<std::mutex> lock(m_mtx); cv.wait(lock, [this](){ return !m_stack.empty(); }); std::shared_ptr<T> res = std::make_shared<T>(std::move(m_stack.front())); m_stack.pop(); return } bool try_pop(T &value){ std::lock_guard<std::mutex> lock(m_mtx); if(m_stack.empty()) return false; value = std::move(m_stack.front()); m_stack.pop(); return true; } std::shared_ptr<T> try_pop(){ std::lock_guard<std::mutex> lock(m_mtx); if(m_stack.empty()) return std::shared_ptr<T>(); std::shared_ptr<T> res = std::make_shared<T>(std::move(m_stack.front())); m_stack.pop(); return res; } bool empty() const { std::lock_guard<std::mutex> lock(m); return data.empty(); } private: std::stack<T> m_stack; std::mutex m_mtx; std::condition_variable m_cv; };
|
并发队列
单向链表可以充当队列的最简单的数据结构:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
| #include <iostream> #include <thread> #include <mutex> #include <condition_variable> template<typename T> class ThreadSafeQueue { private: struct node{ std::shared_ptr<T> data; std::unique_ptr<node> next; }; node* get_tail(){ std::lock_guard<std::mutex> lock(tail_mutex); return tail; } std::unique_ptr<node> pop_head(){ std::unique_ptr<node> old_head = std::move(head); head = std::move(old_head->next); return old_head; } std::unique_lock<std::mutex> wait_for_data(){ std::unique_lock<std::mutex> head_lock(head_mutex); m_cv.wait(head_lock, [&]() { return head.get()!= get_tail(); }); return head_lock; } std::unique_ptr<node> wait_pop_head(){ std::unique_lock<std::mutex> head_lock(wait_for_data()); return pop_head(); } std::unique_ptr<node> wait_pop_head(T &value){ std::unique_lock<std::mutex> head_lock(wait_for_data()); value = std::move(*head->data); return pop_head; } std::unique_lock<node> try_pop_head(){ std::lock_guard<std::mutex> lock(head_mutex); if(head.get() == tail){ return std::unique_lock<node>(); } return pop_head(); } std::unique_lock<node> try_pop_head(T &value){ std::lock_guard<std::mutex> head_lock(head_mutex); if(head.get() == tail){ return std::unique_lock<node>(); } value = std::move(*head->data); return pop_head(); } public: ThreadSafeQueue(): head(new node), tail(head.get()){} ThreadSafeQueue(const ThreadSafeQueue &) = delete; ThreadSafeQueue &operator=(const ThreadSafeQueue &) = delete; void push(T new_value){ std::shared_ptr<T> new_ptr = std::make_shared<T>(std::move(new_value)); std::unique_ptr<node> p(new node); { std::lock_guard<std::mutex> lock(tail_mutex); tail->data = new_ptr; node *new_tail = p.get(); tail->next = std::move(p); tail = new_tail; } m_cv.notify_one(); }
std::shared_ptr<T> wait_and_pop(){ const std::unique_ptr<node> old_head = wait_pop_head(); return old_head->data; } void wait_and_pop(T &value){ const std::unique_ptr<node> old_head = wait_and_pop(value); } std::shared_ptr<T> try_pop(){ std::unique_ptr<node> old_head = try_pop_head(); return old_head ? old_head->data : std::shared_ptr<T>(); } bool try_pop(T &value){ const std::unique_lock<node> old_head = try_pop_head(value); return old_head != nullptr; } bool empty(){ std::lock_guard<std::mutex> lock(head_mutex); return (head.get() == tail); } private: std::mutex head_mutex; std::mutex tail_mutex; std::unique_ptr<node> head; node *tail; std::condition_variable m_cv; }; std::mutex read_mtx; void printInt(const std::string con, int i){ std::lock_guard<std::mutex> lock(read_mtx); std::cout <<con<<" : "<< i << std::endl; } int main(int argc, char const *argv[]) {
ThreadSafeQueue<int> safe_que; std::thread consumer1([&]() { while(true){ std::shared_ptr<int> data = safe_que.wait_and_pop(); printInt("consumer1", *data); } }); std::thread consumer2([&]() { while(true){ std::shared_ptr<int> data = safe_que.wait_and_pop(); printInt("consumer2", *data); } }); std::thread producer([&]() { for (int i = 0; i < 100;++i){ safe_que.push(i); } }); consumer1.join(); consumer2.join(); producer.join(); return 0; }
|
设计并发代码线程安全原则
考虑一个操作是否是线程安全的,需要注意以下几个关键点:
1. 共享资源访问
如果多个线程访问同一个资源(如变量、对象、文件、数据库等),且至少有一个线程在修改该资源,那么这个访问需要是线程安全的。
2. 原子性
确保操作是原子的,即不能被中断。一个操作要么完全执行,要么完全不执行。原子操作通常由硬件支持(如处理器的原子指令)或通过锁实现。
3. 可见性
确保一个线程对数据的修改对其他线程是可见的。这可以通过适当的内存屏障或同步机制来实现。缺乏可见性可能会导致一个线程看到旧的数据,而不是另一个线程刚刚写入的数据。
4. 有序性
确保操作按预期顺序执行。在多线程环境中,编译器和处理器可能会重新排序指令以优化性能,这可能导致线程间的非预期行为。同步机制可以强制执行正确的顺序。
线程池
1、task任务队列表示
2、封装回调函数
3、空闲时挂起,有任务后通知
线程池的局限性:
1、不适合具有顺序执行的任务;
2、任务之间强相关或者互斥性较强;
简易的线程池模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72
| #include <iostream> #include <thread> #include <queue> #include <condition_variable> #include <mutex> #include <string> #include <vector> #include <functional> class ThreadPool{ public: ThreadPool(int numThreads) : stop(false){ for (int i = 0; i < numThreads; ++i){ threads.emplace_back([this]{ while(1){ std::unique_lock<std::mutex> lock(mtx); condition.wait(lock, [this]{ return !tasks.empty() || stop; }); if(stop && tasks.empty()){ return; } std::function<void()> task(std::move(tasks.front())); tasks.pop(); lock.unlock(); task(); } }); } } ~ThreadPool(){ { std::unique_lock<std::mutex> lock(mtx); stop = true; } condition.notify_all(); for(auto &t : threads){ t.join(); } } template <class F, class... Args> void enqueue(F &&f, Args &&...args){ std::function<void()> task = std::bind(std::forward<F>(f), std::forward<Args>(args)...); std::unique_lock<std::mutex> lock(mtx); tasks.emplace(std::move(task)); lock.unlock(); condition.notify_one(); } private: std::vector<std::thread> threads; std::queue<std::function<void()>> tasks; std::mutex mtx; std::condition_variable condition; bool stop; };
int main(){ ThreadPool pool(4); for (int i = 0; i < 10; ++i){ pool.enqueue([i]{ std::cout << "task : " << i << "is running" << std::endl; std::this_thread::sleep_for(std::chrono::seconds(1)); std::cout << "task : " << i << "is done" << std::endl; }); } return 0; }
|
线程池的单例模式实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
|
class ThreadPool { using Task = std::packaged_task<void()>;
public: ThreadPool(const ThreadPool &) = delete; ThreadPool &operator=(ThreadPool &) = delete; static ThreadPool& GetInstance(){ static ThreadPool t; return t; } template<typename T, typename ...Args> auto commit(T func, Args&& ...args) -> std::future<decltype(func(args...))>{ using ReturnType = decltype(func(args...)); if(m_stop.load()) return std::future<ReturnType>(); auto task = std::make_shared<std::packaged_task<ReturnType()>>( std::bind(std::forward<T>(func), std::forward<Args>(args)...)); std::future<ReturnType> ret = task->get_future(); { std::lock_guard<std::mutex> lock(m_mtx); m_tasks.emplace([task]{ (*task)(); }); } m_cv.notify_one(); return ret; } private: ThreadPool(unsigned int num = 4) : m_stop(false){ if(num < 1) m_thread_num = 1; else m_thread_num = num; init_thread(); } void init_thread(){ for (int i = 0; i < m_thread_num;++i){ m_thread_pool.emplace_back([this] { while(!this->m_stop.load()){ Task task; { std::unique_lock<std::mutex> lock(m_mtx); m_cv.wait(lock, [this]{ // 唤醒时直到线程池结束,或者任务池中有任务取消挂起 return m_stop.load() || !m_tasks.empty(); }); if(m_tasks.empty()){ return; } task = std::move(m_tasks.front()); m_tasks.pop(); } --m_thread_num; std::cout << "thread " << std::this_thread::get_id() << "is getting task : " << std::endl; task(); ++m_thread_num; } }); } } ~ThreadPool(){ m_stop = true; m_cv.notify_all(); for (auto &it: m_thread_pool){ if(it.joinable()){ std::cout << "join thread" << it.get_id() << std::endl; it.join(); } } } private: std::queue<std::packaged_task<void()>> m_tasks; std::vector<std::thread> m_thread_pool; std::mutex m_mtx; std::condition_variable m_cv; std::atomic_bool m_stop; std::atomic_int m_thread_num; };
int main(int argc, char const *argv[]) { int num = 0; std::this_thread::sleep_for(std::chrono::seconds(2)); ThreadPool::GetInstance().commit([](int &num){ num = 10; std::cout << "into thread num :" << num << std::endl;
}, std::ref(num)); std::this_thread::sleep_for(std::chrono::seconds(2)); std::cout << "num is :" << num << std::endl; return 0; }
|
实战
一个异步日志
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182
| #include <iostream> #include <string> #include <queue> #include <any> #include <sstream> #include <memory> #include <condition_variable> #include <thread> #include <chrono> #include <ctime> #include <iomanip> enum LogLevel { DEBUG = 0, INFO = 1, WARNING = 2, ERROR = 3 }; static void currentTime(){ auto now = std::chrono::system_clock::now(); std::time_t now_time_t = std::chrono::system_clock::to_time_t(now); std::tm *now_tm = std::localtime(&now_time_t); std::cout << std::put_time(now_tm, "[%y-%m-%d %H:%M:%S]"); } class LogTask{ public: LogTask(){} LogTask(const LogTask &log) : m_level(log.m_level), m_logData(log.m_logData){}; LogTask(const LogTask &&log) : m_level(log.m_level), m_logData(std::move(log.m_logData)){}; public: LogLevel m_level; std::queue<std::any> m_logData; };
class AsyncLogger{ private: AsyncLogger():m_stop(false){ m_logThread = std::thread([this] () { while(true){ std::unique_lock<std::mutex> lock(m_mtx); m_cv.wait(lock, [this]() { return !m_logQueue.empty() || m_stop; }); if(m_stop && m_logQueue.empty()) return; auto logMsg = m_logQueue.front(); m_logQueue.pop(); lock.unlock(); printLogTask(logMsg); } }); } AsyncLogger(const AsyncLogger &) = delete; AsyncLogger &operator=(const AsyncLogger &) = delete; ~AsyncLogger(){ m_stop = true; m_cv.notify_all(); if(m_logThread.joinable()) m_logThread.join(); currentTime(); std::cout << " Exit Logger Success!"; } void printLogTask(std::shared_ptr<LogTask> logTaskPtr){ currentTime(); std::cout << "[" << logLevelToString(logTaskPtr->m_level) << "]"; while(!logTaskPtr->m_logData.empty()){ try{ std::cout<<convertToString(logTaskPtr->m_logData.front()); }catch(const std::bad_any_cast&){ std::cout << "(Invalid Log Data Type)"; } logTaskPtr->m_logData.pop(); } std::cout << std::endl; } std::string convertToString(const std::any &data){ std::ostringstream oss; if(data.type() == typeid(int)) oss << std::any_cast<int>(data); else if(data.type() == typeid(std::string)) oss << std::any_cast<std::string>(data); else if(data.type() == typeid(const char*)) oss << std::any_cast<const char *>(data); else throw std::bad_any_cast(); return oss.str(); } const char* logLevelToString(LogLevel level){ switch(level){ case DEBUG: return "DEBUG"; case INFO: return "INFO"; case WARNING: return "WARNING"; case ERROR: return "ERROR"; default: return "UNKNOW TYPE"; } } public: static AsyncLogger& GetInstance(){ static AsyncLogger log; return log; } template<typename... Args> void AsyncWrite(LogLevel logLevel, Args&&... args){ std::shared_ptr<LogTask> task = std::make_shared<LogTask>(); task->m_level = logLevel; (task->m_logData.push(args), ...); std::unique_lock<std::mutex> lock(m_mtx); bool notify = m_logQueue.empty(); m_logQueue.push(task); lock.unlock(); if(notify){ m_cv.notify_one(); } } template<typename... Args> void AsyncWriteInfo(Args&&...args){ AsyncWrite(INFO, std::forward<Args>(args)...); } template<typename... Args> void AsyncWriteError(Args&&...args){ AsyncWrite(ERROR, std::forward<Args>(args)...); } template<typename... Args> void AsyncWriteDebug(Args&&...args){ AsyncWrite(DEBUG, std::forward<Args>(args)...); } template<typename... Args> void AsyncWriteWarning(Args&&...args){ AsyncWrite(WARNING, std::forward<Args>(args)...); } private: std::queue<std::shared_ptr<LogTask>> m_logQueue; std::condition_variable m_cv; std::mutex m_mtx; bool m_stop; std::thread m_logThread; };
void func_error(){ for(int i = 0 ;i<100;++i){ AsyncLogger::GetInstance().AsyncWriteError(i); } } void func_info(){ for(int i = 0;i<100;++i){ AsyncLogger::GetInstance().AsyncWriteInfo(i); } }
int main(int argc, char const *argv[]) { AsyncLogger::GetInstance().AsyncWrite(LogLevel::INFO, 3.12, "HELLO", 321); AsyncLogger::GetInstance().AsyncWrite(LogLevel::ERROR, "INPUT ERROR"); AsyncLogger::GetInstance().AsyncWriteError("haha this is a error", 132.123); std::thread t1(func_error); std::thread t2(func_info); t1.join(); t2.join(); return 0; }
|