C++多线程
Published:
C++
创建thread
thread t1(func, values)
例子
// need the rederence of some_val1 void some_func(int some_val, int& some_val1) { do_something } ... int c = 1; thread t1(some_func, 0, ref(c));
所有变量默认按值传输,如需引用地址,则需要添加ref包裹。
添加到主函数
t1.join()
不添加到主函数,则该线程的运行是不确定的,有可能主函数结束了,但该线程仍未结束,所以有可能造成访问不该访问的地址的错误(segmentation fault).
调用当前线程的方法
#include <thread> #include <chrono> ... //this_thread //sleep method this_thread::sleep_for(chrono::milliseconds(100)) this_thread::get_id()
mutex类
线程锁,可使用
lock()
和unlock()
方法lock_guard类
单次线程锁,自动加锁,自动解锁。
为一个带
()
方法的类创建threadclass Factor { public: void openrator()() { do_something } } int main() { // passing the class with instance Factor f; thread t(f); // pass by the class itself // extra () used to explicitly declare // that a new Factor class should be // passed rather than the constructor function // because c++ automatically treats () as function thread t1((Factor())); }
获取硬件容量
thread::hardware_concurrency()
推迟加入t1
try { ... // catching any exception } catch(...) { t1.join(); throw; }
move方法
相当于把内容转移到另一块内存(?),move过后原先的变量将变得无法访问。
string s = "Something" // s is no longer accessible in the current scope thread t2(some_func, move(s))
move之后s的作用域改变。
thread相互赋值
只能使用move方法
thread t3 = move(t1)
Thread-safe logger
多线程程序中, 要对线程间公用的资源进行保护。
比如shared_print
void shared_print(string msg) { lock_guard<mutex> guard(mu); cout << msg << " " << id << endl; }
公用一个ofstream instance
class LogFile { mutex mut; ofstream f; public: LogFile() {} ~LogFile() {f.close();} void shared_print() { // lazy loading, load it when used // instead of loading in the constructor if (!f.is_open()) { f.open("log.txt", fstream::out); } lock_guard<mutex> locker(mut) } }
注意为了实现线程安全, 不能让公用的ofstream泄露,即不能让ofstream在缺少mutex保护的情况下对被外界访问。所以getter是不能有的。同时不能让这个f作为外部传入函数的参量,比如:
void processf(void fun(ofstream &f)){fun(f)} -> error
dead lock
线程锁上锁和解锁的顺序不一样。一个先上锁的线程锁如比后上锁的线程锁先解锁则会出现错误,因为此时原先线程锁还处在后上线程锁的保护中。这样的锁是解不开的。
mut1.lock() mut.lock() mut1.unlock() ..oops mut.lock()
解决方法:用lock_guard, 尽量只用一个锁。用std::lock()给多个线程同时上锁。或者按相同顺序上锁和解锁。
13. Granularity
Fine-grained lock: small amoung of data
Coarse-grained lock: protects big amount of data
- Unique_lock
多次上锁解锁的线程锁。
例子
class LogFile {
mutex mut;
mutex mut_open;
once_flag flag;
ofstream f;
public:
LogFile() {}
~LogFile() {f.close();}
void shared_print(string msg) {
unique_lock<mutex> locker(mut, defer_lock);
locker.lock();
cout << msg << endl;
locker.unlock();
}
}
注意unique_lock不能直接互相赋值(can’t be copied),需要move,与thread的要求相似。
相关配置:
defer_lock => do not acquire ownership of the mutex 不获取mutex所有权
try_to_lock => try to acquire ownership of the mutex without blocking 尝试锁上线程锁
adopt_lock => assume calling thread already has ownership of the mutex 假设母线程拥有该锁
- call_once
还是在刚才的LogFile例子中,实现一次打开文件,同样是lazy_loading,但是使用call_once进行控制。
void share_print1(string msg) {
// each thread can only call once
call_once(flag, [&](){f.open("log.txt"); printf("log opened.\n");});
unique_lock<mutex> locker(mut, defer_lock);
...
}
- condition variables
条件变量,用于决定调用的先后顺序。
例子
#include<time.h>
#include<queue>
#include<fstream>
using namespace std;
deque<int> q;
mutex mut;
condition_variable cond;
void function_1() {
int count = 10;
while (count > 0) {
unique_lock<mutex> locker(mut);
q.push_front(count);
locker.unlock();
// Notify one waiting thread, if there is one.
cond.notify_one();
// cond.notify_all();
// wake up all the threads;
this_thread::sleep_for(chrono::seconds(1));
count--;
}
}
void function_2() {
int data = 0;
while (data != 1) {
unique_lock<mutex> locker(mut);
cond.wait(locker, [](){ return !q.empty(); }); // spurious wake(?)
data = q.back();
q.pop_back();
locker.unlock();
cout << "t2 got a value from t1: " << data << endl;
cond.wait similar logic to the following code
/* instead of writing the following codes
if (!q.empty()) {
data = q.back();
q.pop_back();
locker.unlock();
cout << "t2 got a value from t1: " << data << endl;
} else {
// sleep => not mutex, unlock then sleep, or lock everybody out
locker.unlock();
this_thread::sleep_for(chrono::milliseconds(10));
} */
}
}
int main() {
thread t1(function_1);
thread t2(function_2);
t1.join();
t2.join();
return 0;
}
定义一个条件变量cond。因为队列的读取依赖于插入,所以func1先进行插入操作, 然后notify_one,通知正在等待的线程插入进行完毕,随后func2继续执行。在需要等待的地方使用cond.wait(locker, condition), 在得到通知之前locker是不上锁的。condition是相当于另外的控制条件。在这里是队列不为空。所以仅仅当func2收到通知并且队列不为空的情况下才会继续操作。这个写法等同于让func2轮询(隔一段时间查)q是否有元素了。这种方法比起cond是有风险的,不能保证时间间隔控制的很好。
- async and future
future<int> fu = async(launch::deferred, factorial, 4);
===> thread t1(factorial, 4)
int x= fu.get()
async会创建一个线程。deferred设置让此线程的执行推迟直到有get请求的时候。
注意不能get两次,因为再get这个线程已经结束了。
promise<int> p;
p.set_value(4);
这个promise默认从future那得来的值是4。
设置future_errc::broke_promise
p.set_exception(make_exception_ptr(runtime_error("Promise broken")));
同样,future和promise只能使用move进行移动。不能直接copy。
但是shared_future是可以被copy的,它被线程当做参数,具有广播模型的特征。(将变更给所有使用该future的promise)(?)
packaged_task
(TBD)