TimelineServer: 线程同步总结
同步机制
互斥锁(mutex):
线程访问临界资源前加锁,多个线程同时访问时,成功加锁的线程可以访问,其他线程等待持锁线程释放锁后继续竞争
临界资源只能被一个线程访问的情况下使用-
等待某个条件的达成
利用互斥锁想要监视某个条件是否达成可以使用轮询的方法(不断加锁访问关键变量直到条件满足),但这显然是低效的
利用条件变量可以将上述”轮询”变成”通知 -
信号量可以看作在”互斥区”门口放一个盒子,里面有很多小球.
每个线程进入互斥区前必须从盒子里拿出一个球(门票)
同时有的线程会为盒子补充小球(也可能是拿走小球的线程离开互斥区时放回去)
CPP 使用同步机制
CPP 也可以通过两种风格使用线程同步机制
C 风格
编译命令:
1 | g++ xxx.cpp -o xxx -lpthread |
C 线程 API
线程的创建与销毁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// thread_id ==> 返回被创建线程的 id, pthread_ 系列函数都会使用这个id来饮用线程
// attr ==> 线程属性,NULL 使用默认属性
// start_routine ==> 线程运行函数
// return --> 成功时返回0,失败时返回错误码
int pthread_create(pthread_t *thread_id,
pthread_attr_t *attr,
void *(*start_routine)(void *),
void *arg);
// 线程函数最后都最好调用该函数以安全干净地退出
// 通过retval向线程回收者传递其退出信息
void pthread_exit(void *retval);
// 回收其他线程
// thread_id ==> 被回收线程的id
// retval ==> 目标线程退出信息
// return --> 成功时返回0,失败时返回错误码
int pthread_join(pthread_t thread_id, void **retval);
// 分离一个线程, 线程在结束后会自行释放资源,无需再回收
// thread_id ==> 被分离线程的id
int pthread_detach(pthread_t th);
// 异常终止一个线程
// thread_id ==> 被终止线程的id
// return --> 成功时返回0,失败时返回错误码
int pthread_cancel(pthread_t thread_id);
// 设置当前线程接收到终止信号的反映以及保存之前的设置
int pthread_setcancelstate(int state, int *oldstate);
int pthread_setcanceltype(int type, int *oldtype);线程属性
属性中需要注意的就一个
detachstate
.其默认是 PTHREAD_CREATE_JOINABLE,可改为 PTHREAD_CREATE_DETACH(作用与 pthread_detach 一样).
但是也可以直接在创建完之后用pthread_detach
,效果一样.
所以属性一般来说直接传入NULL
使用默认参数就行.1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
union pthread_attr_t
{
char __size[__SIZEOF_PTHREAD_ATTR_T];
long int__align;
};
// 创建与销毁
int pthread_attr_init(pthread_attr_t *attr);
int pthread_attr_destroy(pthread_attr_t*attr);
// 更改属性
int pthread_attr_getdetachstate(const pthread_attr_t *attr, int*detachstate);
int pthread_attr_setdetachstate(pthread_attr_t *attr, int detachstate);
int pthread_attr_getstackaddr(const pthread_attr_t *attr, void **stackaddr);
int pthread_attr_setstackaddr(pthread_attr_t *attr, void *stackaddr);
int pthread_attr_getstacksize(const pthread_attr_t *attr, size_t*stacksize);
int pthread_attr_setstacksize(pthread_attr_t *attr, size_t stacksize);
int pthread_attr_getstack(const pthread_attr_t *attr, void **stackaddr, size_t *stacksize);
int pthread_attr_setstack(const pthread_attr_t *attr, void *stackaddr, size_t stacksize);
int pthread_attr_getguardsize(const pthread_attr_t *attr, size_t*guardsize);
int pthread_attr_setguardsize(pthread_attr_t *attr, size_t guardsize);
int pthread_attr_getschedparam(const pthread_attr_t *attr, struct sched_param*param);
int pthread_attr_setschedparam(pthread_attr_t *attr, struct, sched_param*param);
int pthread_attr_getschedpolicy(const pthread_attr_t *attr, int*policy);
int pthread_attr_setschedpolicy(pthread_attr_t *attr, int policy);
int pthread_attr_getinheritsched(const pthread_attr_t *attr, int*inherit);
int pthread_attr_setinheritsched(pthread_attr_t *attr, int inherit);
int pthread_attr_getscope(const pthread_attr_t *attr, int*scope);
int pthread_attr_setscope(pthread_attr_t *attr, int scope);C 同步 API
互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 初始化互斥锁
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *mutexattr);
// 实际上是把互斥锁所有字段都设置为0
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
// 销毁互斥锁,释放内核资源
int pthread_mutex_destroy(pthread_mutex_t *mutex);
// 以原子操作方式加锁
// 如果已经上锁则阻塞至解锁
int pthread_mutex_lock(pthread_mutex_t *mutex);
// pthread_mutex_lock 的非阻塞版本
// 已经上锁时返回错误码 EBUSY
int pthread_mutex_trylock(pthread_mutex_t *mutex);
// 以原子操作方式解锁
int pthread_mutex_unlock(pthread_mutex_t *mutex);
// 初始化/销毁属性对对象
int pthread_mutexattr_init(pthread_mutexattr_t *attr);
int pthread_mutexattr_destroy(pthread_mutexattr_t *attr);
// PTHREAD_PROCESS_SHARE: 跨进程共享
// PTHREAD_PROCESS_PRIVATE: 局部互斥锁
int pthread_mutexattr_getshared(const pthread_mutexattr_t *attr, int *pshared);
int pthread_mutexattr_setshared(const pthread_mutexattr_t *attr, int pshared);
// PTHREAD_MUTEX_NORMAL: 普通锁,对一个已加锁的锁上锁 <阻塞>,对一个已解锁的锁解锁 <导致不可预期后果>
// PTHREAD_MUTEX_ERRORCHECK: 检错锁,对一个已加锁的锁上锁 <直接返回EDEADLK>,对一个已解锁的锁解锁 <直接返回EPERM>
// PTHREAD_MUTEX_RECURSIVE: 嵌套锁,一个线程多次加锁会多重上锁 <要解锁需要解锁同样次数>.解锁其他线程加锁的/对已解锁的锁解锁 <返回EPERM>
// PTHREAD_MUTEX_DEFAULT: 默认锁,实际实现是上述三种之一
int pthread_mutexattr_gettype(const pthread_mutexattr_t *attr, int *type);
int pthread_mutexattr_settype(pthread_mutexattr_t *attr, int type);条件变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 创建与销毁,其属性与互斥锁类似,略过
int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *cond_attr);
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int pthread_cond_destroy(pthread_cond_t *cond);
// 唤醒所有等待目标变量的线程
int pthread_cond_broadcast(pthread_cond_t *cond);
// 唤醒一个等待线程
int pthread_cond_signal(pthread_cond_t *cond);
// 等待目标条件变量,需要一个互斥锁辅助实现该函数的原子性
// 调用前需要保证 mutex 已被加锁
// 阻塞前会释放 mutex,返回后会为 mutex 重新上锁
// 通过上述方式保证不会错过任何唤醒事件(自从为mutex加锁后)
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);**信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 初始化未命名信号量(重复初始化会导致不可预期后果)
// pshared ==> 0:局部信号来嗯, 1:跨进程信号量
// value ==> 初始值
// return --> 成功时返回0,失败时返回-1并设置errno
int sem_init(sem_t *sem, int pshared, unsigned int value);
// 销毁信号量(释放内核资源)
// 销毁其他线程正在使用的信号量会导致不可预期后果
int sem_destroy(sem_t *sem);
// 以原子操作的方式将信号量减一
// 如果信号量已经是0,则阻塞至其非0
int sem_wait(sem_t *sem);
// sem_wait 的非阻塞版本
// 如果信号量为0则返回-1并设置errno为EAGAIN
int sem_trywait(sem_t *sem);
// 以原子操作的方式将信号量加一
// 自动唤醒阻塞在 sem_wait 的线程
int sem_post(sem_t *sem);C 例子
互斥锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
using std::cin;
using std::cout;
using std::endl;
// 互斥锁和临界资源
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
int resource = 0;
void *thread_func(void *arg)
{
pthread_mutex_lock(&mutex);
resource += 1;
cout << "resource is " << resource << " in thread " << pthread_self() << endl;
pthread_mutex_unlock(&mutex);
pthread_exit(NULL);
}
int main()
{
cout << "main thread id:" << pthread_self() << endl;
std::vector<pthread_t> threads;
for (int i = 0; i < 20; i++)
{
pthread_t id;
if (!pthread_create(&id, NULL, thread_func, NULL))
{
// 创建线程成功
threads.push_back(id);
}
}
for (pthread_t id : threads)
{
pthread_join(id, NULL);
}
pthread_mutex_destroy(&mutex);
return 0;
}条件变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
using std::cin;
using std::cout;
using std::endl;
// 互斥锁和临界资源
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
int resource = 0;
// 操作资源函数
void *producer(void *arg);
void *consumer(void *arg);
int main()
{
// 生成消费者(只消费一次,一次消耗10个资源)
pthread_t consumer_thread;
pthread_create(&consumer_thread, NULL, consumer, NULL);
// 生成11个生产者,各生产一次,一次生产1个资源
std::vector<pthread_t> producer_threads;
for (int i = 0; i < 11; i++)
{
pthread_t id;
if (!pthread_create(&id, NULL, producer, NULL))
{
// 创建线程成功
producer_threads.push_back(id);
}
}
// 等待所有线程结束
for (pthread_t id : producer_threads)
{
pthread_join(id, NULL);
}
pthread_join(consumer_thread, NULL);
// 销毁互斥量和条件变量
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cond);
return 0;
}
void *producer(void *arg)
{
pthread_mutex_lock(&mutex);
{
// 生产一个资源
resource += 1;
cout << "resource produced" << endl;
}
pthread_mutex_unlock(&mutex);
// 尝试唤醒
pthread_cond_signal(&cond);
pthread_exit(NULL);
}
void *consumer(void *arg)
{
cout << "The consumer thread begins." << endl;
pthread_mutex_lock(&mutex);
while (resource < 10)
{
pthread_cond_wait(&cond, &mutex);
cout << "the resource is " << resource << endl;
}
pthread_mutex_unlock(&mutex);
cout << "The consumer get enough resoure;" << endl;
pthread_exit(NULL);
}信号量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
using std::cin;
using std::cout;
using std::endl;
// 临界资源
sem_t resource_sem;
unsigned int resource;
// 操作资源函数
void *consumer(void *arg);
int main()
{
resource = 3;
sem_init(&resource_sem, 0, resource);
// 生成消费者
std::vector<pthread_t> consumer_threads;
for (int i = 0; i < 6; i++)
{
pthread_t id;
if (!pthread_create(&id, NULL, consumer, NULL))
{
// 创建线程成功
consumer_threads.push_back(id);
usleep(500000);
}
}
// 等待所有线程结束
for (pthread_t id : consumer_threads)
{
pthread_join(id, NULL);
}
// 销毁互斥量和条件变量
sem_destroy(&resource_sem);
return 0;
}
void *consumer(void *arg)
{
sem_wait(&resource_sem);
{
cout << "===== consumer " << pthread_self() << " starts =====" << endl;
int time = 3;
while (time > 0)
{
time -= 1;
cout << "consumer " << pthread_self() << " is processing." << endl;
sleep(1);
}
cout << "===== consumer " << pthread_self() << " completes =====" << endl;
}
sem_post(&resource_sem);
pthread_exit(NULL);
}CPP 风格
编译命令(只考虑 thread 库):
1 | g++ xxx.cpp -o xxx -pthread -std=c++11 |
CPP 线程 API
线程创建与运行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
using std::cin;
using std::cout;
using std::endl;
void thread_func()
{
cout << "thread_func is running" << endl;
}
class thread_class
{
public:
void operator()() const
{
cout << "thread_class is running" << endl;
}
};
int main()
{
// 直接使用函数构造线程
std::thread test1(thread_func);
// 使用重写了 operator()() 的类对象构造线程
thread_class temp;
std::thread test2(temp);
// 注意这种方法会声明一个 test2 函数,而非构造一个对象
// 根本在于其中的 thread_class() 返回了一个临时对象
// std::thread test2(thread_class());
// 解决方法是:
// std::thread test2((thread_class()));
// std::thread test2{thread_class()};
// 使用 lambda 表达式构造线程
std::thread test3([]
{ cout << "lambda func is running" << endl; });
test1.join();
test2.join();
test3.detach();
return 0;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
using std::cin;
using std::cout;
using std::endl;
int main()
{
// 使用 lambda 表达式构造线程
std::thread test([]
{ cout << "thread is running" << endl; });
sleep(1);
// 需要在 thread 对象销毁前决定其是 join 方式还是 detach 方式
// 否则其会直接调用 terminate(), 此时再决定会触发异常
// join 之后再次 join 会触发异常
cout << ((true == test.joinable()) ? ("test1 is joinable") : ("test1 is unjoinable")) << endl;
test.join();
cout << ((true == test.joinable()) ? ("test1 is joinable") : ("test1 is unjoinable")) << endl;
return 0;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
using std::cin;
using std::cout;
using std::endl;
int main()
{
std::thread test([]
{ cout << "thread is running" << endl; });
try
{
sleep(1);
throw 500;
}
catch (...)
{
// 如果决定使用 join 方式
// 需要在错误处理的时候调用 join,以免出现生命周期问题
// 所以倾向于在不会出现错误的情况下使用join
test.join();
throw;
}
test.join();
return 0;
}多线程传参
参考资料:
线程函数传参详解
<<C++ Concurrency In Action>>
还有一部分是 thread 所有权的传递(move, 移动构造等)略过,可以参考
<<C++ Concurrency In Action>>
2.3 节1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
using std::cin;
using std::cout;
using std::endl;
// 新线程参数都会拷贝到线程独立内存中
// 向线程传递参数会使传递方式变为值传参
// 原本的值传参 i 会 <值传参>
// 引用传参会变成 <类似地址传参> 的方式
// 传递指针依然会 <复制指针的参数>
void func(int i, std::string const &s)
{
cout << s << endl;
}
int main()
{
// 直接使用字面量,字面量存在于字面量池中,不与主线程同生命周期,可以正常运行
// 字面量在线程上下文中完成向 std::string 的隐式转化
std::thread test1(func, 1, "hello test1");
test1.detach();
// buffer 本身是一个指针变量,指向栈中的变量
// 在主线程结束之后会销毁.
// 此时新线程拿到的还是指向老线程中变量的指针,会出现未定义行为
// char buffer[1024];
// sprintf(buffer, "%i", 666);
// std::thread test2(func, 2, buffer);
// test2.detach();
// 正确做法是在将参数传递到 std::thread 的构造函数之前将其转化为 std::string 对象
// 避免悬垂指针
char buffer[1024];
sprintf(buffer, "%i", 666);
std::thread test3(func, 1, std::string(buffer));
test3.detach();
return 0;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
using std::cin;
using std::cout;
using std::endl;
class Test
{
public:
int i;
int j;
Test(int i, int j) : i(i), j(j)
{
}
void show()
{
cout << "i:" << i << "\tj:" << j << endl;
}
};
void func(Test &t)
{
t.i = t.i + 1;
t.j = t.j + 1;
}
int main()
{
Test test_var(1, 2);
test_var.show();
// 希望传递一个对象的引用(默认会直接复制这个对象)
// 实际上直接写在参数里会编译失败
// 需要先将该对象转化为引用
// std::thread test_thread(func, test_var);
std::thread test_thread(func, std::ref(test_var));
test_thread.join();
test_var.show();
return 0;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
using std::cin;
using std::cout;
using std::endl;
class Test
{
public:
int i;
int j;
Test(int i, int j) : i(i), j(j)
{
}
void show()
{
cout << "this addr: " << (void *)(this) << endl;
cout << "i:" << i << "\tj:" << j << endl;
}
};
int main()
{
// 将类成员函数作为线程函数,首个参数传递类对象,后面参数与之前相同
Test test_var(1, 2);
// test_var addr: 0x7fffffffdcf8
cout << "test_var addr: " << (void *)(&test_var) << endl;
// 复制式传参
// this addr: 0x55555556d2c8
std::thread test_thread1(&Test::show, test_var);
test_thread1.join();
// 引用式传参
// this addr: 0x7fffffffdcf8
std::thread test_thread2(&Test::show, std::ref(test_var));
test_thread2.join();
return 0;
}CPP 同步 API
互斥锁
1 |
|
条件变量
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
using std::cin;
using std::cout;
using std::endl;
std::mutex m;
std::condition_variable cond;
int main()
{
// 唤醒一个等待该条件变量的线程
// 如果没有任何进程等待,则无效
cond.notify_one();
// 唤醒所有等待线程
// 如果没有任何进程等待,则无效
cond.notify_all();
// 等待条件变量
std::unique_lock<std::mutex> lock(m);
cond.wait(lock);
// 带有时限的 wait
// cond.wait_for();
// cond.wait_until();
return 0;
}- 信号量(信号量在 C++20 才支持,暂不考虑这个)
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 遗世の私语!