C++多线程

3 minute read

Published:

C++

  1. 创建thread

    thread t1(func, values)
    

    例子

    // need the rederence of some_val1
    void some_func(int some_val int& some_val1) {
        do_something
    }
    ...
    int c = 1;
    thread t1(some_func, 0, ref(c));
    

    所有变量默认按值传输,如需引用地址,则需要添加ref包裹。

  2. 添加到主函数

    t1.join()
    

    不添加到主函数,则该线程的运行是不确定的,有可能主函数结束了,但该线程仍未结束,所以有可能造成访问不该访问的地址的错误(segmentation fault).

  3. 调用当前线程的方法

    #include <thread>
    #include <chrono>
    ...
    //this_thread
    //sleep method
    this_thread::sleep_for(chrono::milliseconds(100))
    this_thread::get_id()
    
  4. mutex类

    线程锁,可使用lock()unlock()方法

  5. lock_guard类

    单次线程锁,自动加锁,自动解锁。

  6. 为一个带()方法的类创建thread

    class Factor {
        public:
        void openrator()() {
            do_something
        }
    }
       
    int main() {
        // passing the class with instance
        Factor f;
    	thread t(f);
           
        // pass by the class itself
        // extra () used to explicitly declare
        // that a new Factor class should be
        // passed rather than the constructor function
        // because c++ automatically treats () as function
        thread t1((Factor()));
          
    }
    
  7. 获取硬件容量

    thread::hardware_concurrency()
    
  8. 推迟加入t1

    try {
        ...
        // catching any exception
    } catch(...) {
        t1.join();
        throw;
    }
    
  9. move方法

    相当于把内容转移到另一块内存(?),move过后原先的变量将变得无法访问。

    string s = "Something"
    // s is no longer accessible in the current scope
    thread t2(some_func, move(s))
    

    move之后s的作用域改变。

  10. thread相互赋值

    只能使用move方法

    thread t3 = move(t1)
    
  11. Thread-safe logger

    多线程程序中, 要对线程间公用的资源进行保护。

    比如shared_print

    void shared_print(string msg) {
        lock_guard<mutex> guard(mu);
        cout << msg << " " << id << endl;
    } 
    

    公用一个ofstream instance

    class LogFile {
        mutex mut;
        ofstream f;
    public:
     	LogFile() {}
        ~LogFile() {f.close();}
        void shared_print() {
            // lazy loading, load it when used
            // instead of loading in the constructor
            if (!f.is_open()) {
                f.open("log.txt", fstream::out);
            }
            lock_guard<mutex> locker(mut)
        }
    }
    

    注意为了实现线程安全, 不能让公用的ofstream泄露,即不能让ofstream在缺少mutex保护的情况下对被外界访问。所以getter是不能有的。同时不能让这个f作为外部传入函数的参量,比如:

    void processf(void fun(ofstream &f)){fun(f)} -> error
    
  12. dead lock

    线程锁上锁和解锁的顺序不一样。一个先上锁的线程锁如比后上锁的线程锁先解锁则会出现错误,因为此时原先线程锁还处在后上线程锁的保护中。这样的锁是解不开的。

    mut1.lock()
    mut.lock()
    mut1.unlock() ..oops
    mut.lock()
    

解决方法:用lock_guard, 尽量只用一个锁。用std::lock()给多个线程同时上锁。或者按相同顺序上锁和解锁。

13. Granularity

Fine-grained lock: small amoung of data

Coarse-grained lock: protects big amount of data

  1. Unique_lock

多次上锁解锁的线程锁。

例子

class LogFile {
    mutex mut;
    mutex mut_open;
    once_flag flag;
    ofstream f;
public:
    LogFile() {}
    ~LogFile() {f.close();}
    void shared_print(string msg) {
        unique_lock<mutex> locker(mut, defer_lock);
        locker.lock();
        cout << msg << endl;
        locker.unlock();
    }
}

注意unique_lock不能直接互相赋值(can’t be copied),需要move,与thread的要求相似。

相关配置:

defer_lock => do not acquire ownership of the mutex 不获取mutex所有权
try_to_lock => try to acquire ownership of the mutex without blocking 尝试锁上线程锁
adopt_lock => assume calling thread already has ownership of the mutex	假设母线程拥有该锁
  1. call_once

还是在刚才的LogFile例子中,实现一次打开文件,同样是lazy_loading,但是使用call_once进行控制。

void share_print1(string msg) {
    // each thread can only call once
    call_once(flag, [&](){f.open("log.txt"); printf("log opened.\n");});
    unique_lock<mutex> locker(mut, defer_lock);
    ...
}
  1. condition variables

条件变量,用于决定调用的先后顺序。

例子

#include<time.h>
#include<queue>
#include<fstream>

using namespace std;

deque<int> q;
mutex mut;
condition_variable cond;

void function_1() {
    int count = 10;
    while (count > 0) {
        unique_lock<mutex> locker(mut);
        q.push_front(count);
        locker.unlock();
        // Notify one waiting thread, if there is one.
        cond.notify_one(); 
        // cond.notify_all();
        // wake up all the threads;
        this_thread::sleep_for(chrono::seconds(1));
        count--;
    }
}

void function_2() {
    int data = 0;
    while (data != 1) {
        unique_lock<mutex> locker(mut);
		cond.wait(locker, [](){ return !q.empty(); }); // spurious wake(?)
        data = q.back();
        q.pop_back();
        locker.unlock();
        cout << "t2 got a value from t1: " << data << endl;

        cond.wait similar logic to the following code
        /* instead of writing the following codes
        if (!q.empty()) {
            data = q.back();
            q.pop_back();
            locker.unlock();
            cout << "t2 got a value from t1: " << data << endl;
        } else {
            // sleep => not mutex, unlock then sleep, or lock everybody out
            locker.unlock();
            this_thread::sleep_for(chrono::milliseconds(10));
        } */
    }
}


int main() {
    thread t1(function_1);
    thread t2(function_2);
    t1.join();
    t2.join();
    return 0;
}

定义一个条件变量cond。因为队列的读取依赖于插入,所以func1先进行插入操作, 然后notify_one,通知正在等待的线程插入进行完毕,随后func2继续执行。在需要等待的地方使用cond.wait(locker, condition), 在得到通知之前locker是不上锁的。condition是相当于另外的控制条件。在这里是队列不为空。所以仅仅当func2收到通知并且队列不为空的情况下才会继续操作。这个写法等同于让func2轮询(隔一段时间查)q是否有元素了。这种方法比起cond是有风险的,不能保证时间间隔控制的很好。

  1. async and future
future<int> fu = async(launch::deferred, factorial, 4);
===> thread t1(factorial, 4)

int x= fu.get()

async会创建一个线程。deferred设置让此线程的执行推迟直到有get请求的时候。

注意不能get两次,因为再get这个线程已经结束了。

promise<int> p;
p.set_value(4);

这个promise默认从future那得来的值是4。

设置future_errc::broke_promise

p.set_exception(make_exception_ptr(runtime_error("Promise broken")));

同样,future和promise只能使用move进行移动。不能直接copy。

但是shared_future是可以被copy的,它被线程当做参数,具有广播模型的特征。(将变更给所有使用该future的promise)(?)

  1. packaged_task

    (TBD)