标签 多线程 下的文章

一、互斥量

互斥量的本身是一把锁,在访问共享资源时前对互斥量加锁,访问完成后释放。在对互斥量加锁后,其他线程如果想再次加锁,操作会被阻塞,直到锁被释放为止。

创建互斥量和销毁互斥量的函数定义:

#include <pthread.h>

int pthread_mutex_init(pthread_mutex_t *restrict mutex,
      const pthread_mutexattr_t *restrict attr);
int pthread_mutex_destroy(pthread_mutex_t *mutex);

也可以直接使用宏定义PTHREAD_MUTEX_INITIALIZER完成对锁的初始化:

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

对互斥量加锁和解锁:

#include <pthread.h>

int pthread_mutex_lock(pthread_mutex_t *mutex);
int pthread_mutex_trylock(pthread_mutex_t *mutex);
int pthread_mutex_unlock(pthread_mutex_t *mutex);

其中,pthread_mutex_trylock()会尝试对互斥量加锁,如果此时互斥量没有被锁住,加锁成功。如果互斥量已经被其他线程锁住了,返回EBUSY。还有一个可以执行超时加锁的函数:

#include <pthread.h>
#include <time.h>

int pthread_mutex_timedlock(pthread_mutex_t *restrict mutex,
        const struct timespec *restrict abs_timeout);

对加锁设置超时时间,如果在指定时间内,加锁不成功(即锁还没有被释放或者释放了又被其他线程抢占了),加锁失败,直接返回,不阻塞线程了。

二、使用示例

分别创建50个线程,每个线程对全局变量n执行1000次自加操作:

#include <stdio.h>
#include <pthread.h>
#include "log.h"

static int n = 0;

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;

// 线程函数
void *f(void *args) {
    int idx = 0;
    // n自加1000次
    while (idx++ < 1000) {
        pthread_mutex_lock(&mutex);
        n++;
        pthread_mutex_unlock(&mutex);
    }
    return 0;
}

int main() {
    int i;
    pthread_t ts[50];
    // 创建50个线程
    for (i = 0; i < 50; i++) {
        pthread_create(&ts[i], 0, f, 0);
    }

    // 回收线程
    for (i = 0; i < 50; i++) {
        pthread_join(ts[i], 0);
    }

    pthread_mutex_destroy(&mutex);
    info("n = %d\n", n);

    return 0;
}

执行结果:

mutex.png

三、死锁

当多个线程之间,对多个锁产生不同的竞争关系时,就有可能发生死锁现象。

注意:死锁只会发生在一个线程试图锁住另一个线程以相反顺序锁住的互斥量

例如此时有A/B两个锁,t1t2两个线程分别执行以下抢占:

  1. t1对A加锁,t2对B加锁,两者都加锁成功
  2. t2再对A加锁,t1对B加锁,此刻t1会等待t2释放B锁,而t2又要等待t1先释放A锁,此时产生死锁。

一、读写锁

读写锁和互斥量相似,都是对共享空间执行加锁和解锁的过程。不过,读写锁比互斥量有更高的并行性。

读写锁有三种状态:读模式加锁、写模式加锁和不加锁。读写锁只允许同时有一个线程占有写模式的锁。

这就意味着读锁可以同时被多个线程拥有,读写锁特别适合读次数远大于写次数的场景。

1.1 创建并初始化一把锁

读写锁的数据类型为pthread_rwlock_t,使用以下方式初始化和销毁:

#include <pthread.h>

int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock,
      const pthread_rwlockattr_t *restrict attr);
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);

1.2 加锁和解锁

#include <pthread.h>

// 加读锁
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_tryrdlock(pthread_rwlock_t *rwlock);
// 加写锁
int pthread_rwlock_trywrlock(pthread_rwlock_t *rwlock);
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
// 解锁
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);

pthread_rwlock_rdlock()可以同时对一把锁使用,不会阻塞线程。

1.3 带超时的读写锁

#include <pthread.h>
#include <time.h>

int pthread_rwlock_timedrdlock(pthread_rwlock_t *restrict rwlock,
      const struct timespec *restrict abs_timeou
int pthread_rwlock_timedwrlock(pthread_rwlock_t *restrict rwlock,
      const struct timespec *restrict abs_timeout);

二、示例

#include <stdio.h>
#include <pthread.h>
#include "log.h"

static int n = 0;

pthread_rwlock_t rwlock;

// 线程函数
void *f(void *args) {
    int idx = 0;
    while (idx++ < 1000) {
        pthread_rwlock_wrlock(&rwlock);
        n++;
        pthread_rwlock_unlock(&rwlock);
    }
    return 0;
}

int main() {
    int i;
    pthread_t ts[50];

    pthread_rwlock_init(&rwlock, NULL);

    for (i = 0; i < 50; i++) {
        pthread_create(&ts[i], 0, f, 0);
    }

    for (i = 0; i < 50; i++) {
        pthread_join(ts[i], 0);
    }

    pthread_rwlock_destroy(&rwlock);
    info("n = %d", n);

    return 0;
}

执行结果:

rwlock.png

一、自旋锁

自旋锁和互斥量相似,通过加锁和解锁来保护对共享数据的保护。

和互斥量不同的是:互斥量在锁已经被占有的情况下,会阻塞等待,此时线程处于休眠状态,不占用CPU资源。而自旋锁此时虽然也会被阻塞,但是不会休眠,处于忙等的状态,不会交出CPU资源。

自旋锁适用于那些锁持有时间短不希望把时间浪费在CPU调度上的场景。

linux内核中,自旋锁被广泛的使用,因为内核模块特别是在网卡驱动中,时间精度要求很高,相比其他锁切换时间,自旋锁的忙等效率更高一些。

自旋锁的数据类型为pthread_spinlock_t,相关函数为:

#include <pthread.h>

// 初始化和销毁自旋锁
int pthread_spin_init(pthread_spinlock_t *lock, int pshared);
int pthread_spin_destroy(pthread_spinlock_t *lock);

// 对自旋锁加锁和解锁
int pthread_spin_lock(pthread_spinlock_t *lock);
int pthread_spin_trylock(pthread_spinlock_t *lock);
int pthread_spin_unlock(pthread_spinlock_t *lock);

自旋锁的函数定义基本和其他锁一致,具体使用方法不再详细描述。

二、示例

#include <stdio.h>
#include <pthread.h>
#include "log.h"

static int n = 0;

pthread_spinlock_t spin_lock;

static void *f(void *args) {
    int idx = 0;
    while (idx++ < 1000) {
        pthread_spin_lock(&spin_lock);
        n++;
        pthread_spin_unlock(&spin_lock);
    }
    return 0;
}

int main() {
    int i;
    pthread_spin_init(&spin_lock, 0);

    pthread_t ts[50];
    for (i = 0; i < 50; i++) {
        pthread_create(&ts[i], 0, f, 0);
    }

    for (i = 0; i < 50; i++) {
        pthread_join(ts[i], 0);
    }
    pthread_spin_destroy(&spin_lock);

    info("n = %d", n);

    return 0;
}

编译运行:

 一、条件变量

条件变量是多线程的一种同步方式,它允许多个线程以无竞争的方式等待特定事件发生。无竞争的意思是,当条件满足时,条件满足这个讯号会发送给所有的监听者线程,但多个线程中只有一个能获取到特定事件。条件变量需要配合互斥量一起使用。

相关数据结构和函数:

#include <pthread.h>

// 销毁条件变量
int pthread_cond_destroy(pthread_cond_t *cond);
// 初始化条件变量
int pthread_cond_init(pthread_cond_t *restrict cond,
       const pthread_condattr_t *restrict attr);
// 初始化条件变量
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

条件变量的数据结构为pthread_cond_t,和其他同步的数据结构一样,也提供了两种方式来初始化。一种是通过赋值的方式,一种是通过函数的方式。在使用pthread_cond_init对初始化条件变量的时候,attr参数一般为NULL。

对条件变量加锁的方式:

#include <pthread.h>

// 等待条件满足
int pthread_cond_wait(pthread_cond_t *restrict cond,
       pthread_mutex_t *restrict mutex);
// 带有超时条件的等待
int pthread_cond_timedwait(pthread_cond_t *restrict cond,
       pthread_mutex_t *restrict mutex,
       const struct timespec *restrict abstime);

在对条件变量加锁的时候,需要传入一把已经处于加锁状态的互斥量,此时函数会把当前线程放到条件等待的列表上并对互斥量解锁。此时线程进入条件等待状态,当有信号到达时便会触发。

通知条件满足的函数:

#include <pthread.h>
int pthread_cond_signal(pthread_cond_t *cond);
int pthread_cond_broadcast(pthread_cond_t *cond);

pthread_cond_signal函数会唤醒条件等待队列上的至少一个线程,pthread_cond_broadcast会唤醒条件队列上的所有线程。

条件队列一般用于消息队列,用于统一、协调多个生产者和消费者之间的竞争关系。

二、使用示例

以下使用了三个线程作为消费者,分别从全局队列上获取消息消费:

#include <stdio.h>
#include <unistd.h>
#include <pthread.h>
#include "log.h"

#define THREAD_COUNT 3

// 消息结构
struct msg_st {
    unsigned int msg_id;
    struct msg_st *next;
};

static struct msg_st *msg_queue = NULL;
static pthread_cond_t g_cond = PTHREAD_COND_INITIALIZER;
static pthread_mutex_t g_mutex = PTHREAD_MUTEX_INITIALIZER;

// 消费线程函数
void *handle_msg(void *arg) {
    struct msg_st *msg;
    while (1) {
        pthread_mutex_lock(&g_mutex);
        while (msg_queue == NULL) {
            pthread_cond_wait(&g_cond, &g_mutex);
        }

        // 提取消息
        msg = msg_queue;
        msg_queue = msg_queue->next;
        pthread_mutex_unlock(&g_mutex);

        // 退出
        if (msg->msg_id == (unsigned int) -1) {
            info("Thread 0x%x exit!", (unsigned int) pthread_self());
            break;
        }

        info("Thread 0x%x: msg_id = %u", (unsigned int) pthread_self(), msg->msg_id);
    }
    return NULL;
}

// 生产消息函数
void create_msg(struct msg_st *msg) {
    pthread_mutex_lock(&g_mutex);
    msg->next = msg_queue;
    msg_queue = msg;
    pthread_mutex_unlock(&g_mutex);
    pthread_cond_signal(&g_cond);
}

int main() {
    int i;
    struct msg_st msg[10];
    pthread_t pid[THREAD_COUNT];

    debug("start create threads");
    // 创建3个线程作为消费者
    for (i = 0; i < 3; i++) {
        pthread_create(&pid[i], NULL, handle_msg, NULL);
    }

    debug("start create msgs");
    // 生产10个消息
    for (i = 0; i < 10; i++) {
        msg[i].msg_id = i + 1;
        msg[i].next = NULL;
        create_msg(&msg[i]);
    }

    // 休眠1秒,确保所有消息都被消费者消费完成
    sleep(1);

    // 退出所有线程
    debug("start create exit msgs");
    for (i = 0; i < THREAD_COUNT; i++) {
        msg[i].msg_id = (unsigned int) -1;
        msg[i].next = NULL;
        create_msg(&msg[i]);
    }

    // 回收所有线程
    debug("start join threads");
    for (i = 0; i < THREAD_COUNT; i++) {
        pthread_join(pid[i], NULL);
    }

    debug("program end");

    return 0;
}

运行结果,10个消息分别被3个线程打印出来了:

三、其他

3.1 “惊群”效应

条件变量是否存在惊群效应呢?

不会,线程执行wait操作的时候会被放到一个条件等待队列里面去。当条件满足的时候,系统会自动选择队列前面的线程来消费队列。

3.2 为什么wait前要加锁,这样不会死锁吗?

不会,执行wait操作时需要的是一把已经加锁的互斥量,这个锁在wait函数中会解开。

这样做的目的:如果在wait前没有加锁,生产者线程产生了一个消息并发送信号,再执行wait后信号就丢失了。

3.3 生产者的信号为什么放在unlock之后?

wait收到信号之后要对互斥量加锁,此时锁还被生产者持有,消费者依旧还要等待锁的释放,才能持有锁。对消费者而言,最理想的情况就是生产者产生消息后,我立马就能读取消息,此时的互斥量是用来和其他消费者线程同步的,而不是和生产者线程同步。