第十二章 并发编程


如果逻辑控制流在时间上重叠,那么它们就是 并发的(concurrent) 。现代操作系统提供三种基本的构造并发程序的方法:

  1. 进程 :每个逻辑控制流都是一个进程,由内核调度和维护,进程有独立的虚拟地址空间,控制流必须使用某种显式的进程间通信(IPC)机制和其他流通信。
  2. I/O多路复用 :应用程序在一个进程的上下文中显式地调度它们自己的逻辑流,所有流共享同一个地址空间。
  3. 线程 :线程是运行在一个单一进程上下文中的逻辑流,由内核进行调度,且每个线程共享同一个虚拟地址空间。

基于进程的并发编程


使用进程构造并发程序,主要使用forkexecwaitpid这些函数。
构造一个并发服务器,可以在父进程中接受客户端连接请求,然后创建一个新的子进程为每个客户端提供服务。

基于进程的并发服务器需要注意:

  • 父进程需要包括SIGCHLD处理程序回收僵死子进程的资源,Linux信号是不排队的,SIGCHLD处理程序必须准备好回收多个僵死子进程的资源
  • 父子进程必须关闭它们各自的已连接描述符connfd副本,以避免内存泄漏,套接字文件表表项中引用计数,所以只有父子进程的connfd都关闭了,到客户端的连接才会终止

进程共享文件表,但是不共享用户地址空间。
进程有独立的地址空间,一个进程不可能不小心覆盖另一个进程的虚拟内存,但使得进程共享状态信息更加困难。
基于进程的并发往往比较慢,因为进程控制和IPC开销很高。

// 基于进程的并发服务器  
#include "csapp.h"  
void echo(int connfd);  
void sigchld_handler(int sig)  
{  
    while(waitpid(-1, 0, WNOHANG) > 0) {}  
    return;  
}  
int main(int argc, char **argv)  
{  
    int listenfd, connfd;  
    socklen_t clientlen;  
    struct sockaddr_storage clientaddr;   /* Enough space for any address */  
    char client_hostname[MAXLINE], client_port[MAXLINE];  
    if(argc != 2)  
    {  
        fprintf(stderr, "usage: %s <port>\n", argv[0]);  
        exit(0);  
    }  
    Signal(SIGCHLD, sigchld_handler);  
    listenfd = Open_listenfd(argv[1]);  
    while(1)  
    {  
        clientlen = sizeof(struct sockaddr_storage);  
        connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);  
        if(Fork() == 0)  
        {  
            Close(listenfd);  /* Child closes its listening socket */  
            Getnameinfo((SA *) &clientaddr, clientlen, client_hostname, MAXLINE, client_port, MAXLINE, 0);  
            printf("Connected to (%s, %s)\n", client_hostname, client_port);  
            echo(connfd);  /* Child services client */  
            Close(connfd); /* Child closes connection with client */  
            exit(0);       /* Child exits */  
        }  
        Close(connfd);     /* Parent closes connected socket (important!) */  
    }  
    exit(0);  
}  

基于I/O多路复用的并发编程


I/O多路复用技术的基本思路是 使用select函数,要求内核挂起进程,只有在一个或多个I/O事件发生后,才将控制返回给应用程序。
select是一个复杂的函数,这里只讨论等待一组描述符准备好读的情况。
select处理类型为fd_set的集合,也叫做描述符集合,描述符集合可以看成一个大小为n的位向量,每个位bk 对应于描述符k,当且仅当bk=1,描述符k才表明是描述符集合的一个元素。对描述符集合,只可以进行分配、同类赋值和使用FD_ZEROFD_CLRFD_SETFD_ISSET修改和检查它们。

#include <sys/select.h>  
// 返回已准备好的非零描述符的个数,若出错则为-1  
// fdset表示读集合的描述符集合  
// n表示读集合的基数,实际上是任何描述符集合的最大基数  
// select函数会一直阻塞直到读集合中至少有一个描述符准备好可以读  
// 当前仅当一个从该描述符读取一个字节的请求不会阻塞时,描述符k就准备好可以读了  
// select会修改fdsse指向的fd_set,指明读集合的一个子集,称为准备好集合,由准备好可以读的描述符组成  
// 因此每次调用select前都要更新读集合  
int select(int n, fd_set *fdset, NULL, NULL, NULL);  
FD_ZERO(fd_set *fdset);         // Clear all bits in fdset  
FD_CLR(int fd, fd_set *fdset);  // Clear bit fd in fdset  
FD_SET(int fd, fd_set *fdset);  // Turn on bit fd in fdset  
FD_ISSET(iny fd, fd_SET *fdset); // Is bit fd in fdset on?  

I/O多路复用可以作为并发事件驱动程序的基础,在事件驱动程序中,某些事件会导致流向前推进。
服务器使用I/O多路复用,借助select函数检测输入事件的发生,当每个已连接描述符准备好可读时,服务器就为相应的状态机执行转移,这里即为从描述符读和写回一个文本行。

事件驱动设计的优缺点

  • 基于I/O多路复用的事件驱动服务器是运行在单一进程上下文中的,每个逻辑流都能访问该进程的全部地址空间,流之间共享数据很简单。
  • 事件驱动设计比基于进程的设计给了程序员更多的对程序行为的控制。
  • 事件驱动设计常常比基于进程的设计要高效的多,因为它们不需要进程上下切换来调度新的流。
  • 事件驱动设计的一个缺点是编码复杂。随着并发粒度减小(这里粒度指每个逻辑流每个时间片执行的指令数量),复杂性还会上升。
  • 基于事件驱动的设计另一个缺点是不能充分利用多核处理器。
// 基于I/O多路复用的并发事件驱动服务器  
#include "csapp.h"  
typedef struct   /* Represents a pool of connected descriptors */  
{  
    int maxfd;       /* Largest descriptor in read_set */  
    fd_set read_set; /* Set of all active descriptors */  
    fd_set ready_set;/* Subset of descriptors ready for reading */  
    int nready;      /* Number of ready descriptors from select */  
    int maxi;        /* High water index into client array */  
    int clientfd[FD_SETSIZE];   /* Set of active descriptors */  
    rio_t clientrio[FD_SETSIZE];/* Set of active read buffers */  
}pool;  
int byte_cnt = 0;   /* Counts total bytes received by server */  

// 初始化客户端池  
void init_pool(int listenfd, pool *p)  
{  
    /* Initially, there are no connected descriptors */  
    int i;  
    p->maxi = -1;  
    for(i = 0; i < FD_SETSIZE; ++i)  
        p->clientfd[i] = -1;  
    /* Initially, listenfd is only member of select read set */  
    p->maxfd = listenfd;  
    FD_ZERO(&p->read_set);  
    FD_SET(listenfd, &p->read_set);  
}  

// 添加一个新的客户端到活动客户端池中  
void add_client(int connfd, pool *p)  
{  
    int i;  
    --p->nready;  
    for(i = 0; i < FD_SETSIZE; ++i)   /* Find an available slot */  
    {  
        if(p->clientfd[i] < 0)  
        {  
            /* Add connected descriptor to the pool */  
            p->clientfd[i] = connfd;  
            Rio_readinitb(&p->clientrio[i], connfd);  
            /* Add the descriptor to descriptor set */  
            FD_SET(connfd, &p->read_set);  
            /* Update max descriptor and pool high water mark */  
            if(connfd > p->maxfd)  
                p->maxfd = connfd;  
            if(i > p->maxi)  
                p->maxi = i;  
            break;  
        }  
    }  
    if(i == FD_SETSIZE)  /* Couldn't find an empty slot */  
        app_error("add_client error: Too many clients");  
}  

// 回送来自每个准备好的已连接描述符的一个文本行  
void check_clients(pool *p)  
{  
    int i, connfd, n;  
    char buf[MAXLINE];  
    rio_t rio;  
    for(i = 0; (i <= p->maxi) && (p->nready > 0); ++i)  
    {  
        connfd = p->clientfd[i];  
        rio = p->clientrio[i];  
        /* If the descriptor is ready, echo a text line from it */  
        if((connfd > 0) && (FD_ISSET(connfd, &p->ready_set)))  
        {  
            --p->nready;  
            if((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0)  
            {  
                byte_cnt += n;  
                printf("Server reveived %d (%d total) bytes on fd %d\n", n, byte_cnt, connfd);  
                Rio_writen(connfd, buf, n);  
            }  
            else  /* EOF detected, remove descriptor from pool */  
            {  
                Close(connfd);  
                FD_CLR(connfd, &p->read_set);  
                p->clientfd[i] = -1;  
            }  
        }  
    }  
}  

int main(int argc, char **argv)  
{  
    int listenfd, connfd;  
    socklen_t clientlen;  
    struct sockaddr_storage clientaddr;  
    static pool pool;  
    if(argc != 2)  
    {  
        fprintf(stderr, "usage: %s <port>\n", argv[0]);  
        exit(0);  
    }  
    listenfd = Open_listenfd(argv[1]);  
    init_pool(listenfd, &pool);  
    while(1)  
    {  
        /* Wait for listening/connected descriptor(s) to become ready */  
        pool.ready_set = pool.read_set;  
        pool.nready = Select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL);  
        /* If listening descriptor ready, add new client to pool */  
        if(FD_ISSET(listenfd, &pool.ready_set))  
        {  
            clientlen = sizeof(struct sockaddr_storage);  
            connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);  
            add_client(connfd, &pool);  
        }  
        /* Echo a text line from each ready connected descriptor */  
        check_clients(&pool);  
    }  
    exit(0);  
}  

基于线程的并发编程


线程(thread)就是 运行在进程上下文中的逻辑流 。线程由内核自动调度并由内核通过一个整数ID来识别线程,每个线程都有它自己的线程上下文,包括一个唯一的整数线程ID(TID)、栈、栈指针、程序计数器、通用目的寄存器和条件码,所有运行在一个进程里的线程共享该进程的整个虚拟地址空间。

每个进程开始生命周期时都是单一线程,这个线程称为 主线程 。某一时刻,主线程创建一个 对等线程 ,此刻之后两个线程就并发地运行,当主线程执行一个慢速系统调用,控制就会通过上下文切换传递到对等线程,对等线程会执行一段时间,然后控制传递回主线程,依次类推。

线程与进程的区别

  • 线程上下文要比进程上下文小得多,线程上下文切换要比进程上下文切换快得多。
  • 线程不像进程那样,不是按照严格的父子层次来组织的
  • 主线程和其他线程的区别仅在于它总是第一个运行的线程
  • 一个线程可以杀死它的任何对等线程或者等待它的任意对等线程终止,每个对等线程都能读写相同的共享数据

Posix线程(Pthreads) 是C程序中处理线程的一个标准接口,允许程序创建、杀死和回收线程,与对等线程安全地共享数据,还可以通知对等线程系统状态的变化。
线程的代码和本地数据通常被封装在一个 线程例程 中。每个线程例程都以一个通用指针作为输入,并返回一个通用指针。如果想要传递多个参数给线程例程,那么应该将参数放到一个结构中,并传递一个指向该结构的指针。

线程通过pthread_create函数来创建其他线程。
pthread_create函数创建一个新的线程,并带着一个输入变量arg,在新线程的上下文中运行线程例程f。
使用attr参数来改变创建线程的默认属性,示例中attr属性常设为NULL,这些属性需要进一步的深入学习。
pthread_create函数返回时,参数tid包含新创建线程的ID。
新线程可以通过调用pthread_self函数获得它自己的线程ID。

#include <pthread.h>  
typedef void *(func)(void *);  
// 成功则返回0,出错返回非零  
int pthread_create(pthread_t *tid, pthread_attr_t *attr, func *f, void *argp);  
// 返回调用者的线程ID  
pthread_t pthread_self();  

一个线程以下列方式之一终止:

  • 顶层线程例程返回时,线程会隐式地终止
  • 通过调用pthread_exit函数,线程会显式地终止,主线程调用pthread_exit会等待所有其他对等线程终止,然后再终止主线程和整个线程,返回值为thread_return
  • 某个对等线程调用Linux的exit函数,该函数终止进程以及所有与该进程相关的线程
  • 另一个对等线程通过以当前线程ID作为参数调用pthread_cancel函数终止当前线程
#include <pthread.h>  
void pthread_exit(void *thread_return);  
// 成功返回0, 出错返回非零  
int pthread_cancel(pthread_t tid);  

线程通过调用pthread_join函数等待其他线程终止。
pthread_join函数会阻塞直到tid终止,将线程例程返回的通用(void *)指针赋值为thread_return指向的位置,然后回收已终止线程占用的所有内存资源。
pthread_join函数只能等待一个指定的线程终止,不能等待任意一个线程终止。

#include <pthread.h>  
// 成功返回0, 出错返回非零  
int pthread_join(pthread_t tid, void **thread_return);  

任何一个时间点上,线程是可结合的或分离的

  • 可结合的线程能够被其他线程收回和杀死,被其他线程回收之前,它的内存资源是不释放的
  • 一个分离的线程是不能被其他线程收回和杀死的,它的内存资源在它终止时由系统自动释放

默认情况,线程被创建成可结合的或者是分离的。为了避免内存泄漏,每个可结合线程都应该要么被其他线程显式地收回,要么通过调用pthread_detach函数被分离。

#include <pthread.h>  
// 分离可结合线程tid  
// 线程通过以pthread_self()为参数调用来分离自己  
int pthread_detach(pthread_t tid);  

pthread_once函数允许初始化与线程例程相关的状态,需要动态初始化多个线程共享的全局变量时,这个函数很有用。

#include <pthread.h>  
// once_control是一个全局或者静态变量,总是被初始化为PTHREAD_ONCE_INIT  
pthread_once_t once_control = PTHREAD_ONCE_INIT;  
// 第一次以once_control调用pthread_once时,调用init_routine  
// 接下来再调用时将不做任何事  
int pthread_once(pthread_once_t *once_control, void (*init_routine)(void));  

基于线程的并发服务器,主线程不断等待连接请求,然后创建一个对等线程处理该请求。
传递已连接描述符给对等线程时,传递指向这个描述符的指针,为了避免竞争,这个指针指向一个动态分配的内存块。

// 基于线程的并发服务器  
#include "csapp.h"  
void echo(int connfd);  
void *thread(void *vargp);  
int main(int argc, char **argv)  
{  
    int listenfd, *connfdp;  
    socklen_t clientlen;  
    struct sockaddr_storage clientaddr;  
    pthread_t tid;  
    if(argc != 2)  
    {  
        fprintf(stderr, "usage: %s <port>\n", argv[0]);  
        exit(0);  
    }  
    listenfd = Open_listenfd(argv[1]);  

    while(1)  
    {  
        clientlen = sizeof(struct sockaddr_storage);  
        connfdp = Malloc(sizeof(int));  
        *connfdp = Accept(listenfd, (SA *)&clientaddr, &clientlen);  
        Pthread_create(&tid, NULL, thread, connfdp);  
    }  
}  

/* Thread routine */  
void *thread(void *vargp)  
{  
    int connfd = *((int *)vargp);  
    Pthread_detach(pthread_self());  
    Free(vargp);  
    echo(connfd);  
    Close(connfd);  
    return NULL;  
}  

多线程程序中的变量

  • 全局变量:定义在函数之外,运行时虚拟内存的读写区域只包含每个全局变量的一个实例,任何线程都可以引用
  • 本地自动变量:定义在函数内部且没有static属性,运行时每个线程的栈都包含它自己的所有本地自动变量实例
  • 本地静态变量:定义在函数内部且有static属性,虚拟内存的读写区域只包含在程序中声明的本地静态变量的一个实例,每个对等线程都可以读和写这个实例

一个变量v是共享的,当且仅当它的一个实例被一个以上的线程引用。

用信号量同步线程


没有办法预测操作系统是否将为线程选择一个正确的顺序。
操作共享变量的指令构成临界区,要确保在执行临界区中的指令时,拥有对共享变量的互斥访问。为了保证线程化程序的正确执行,必须以某种方式 同步 线程(协同步调,按规定的次序先后执行)。

信号量是具有非负整数值的全局变量,只能由P操作和V操作来处理

  • P(s):如果s是非零的,那么P将s减1,并且立即返回;如果s为零,那么挂起这个线程,直到s变为非零,而一个V操作会重启这个线程,重启之后P操作将s减1并将控制返回给调用者
  • V(s):V操作将s加1,如果有任何线程阻塞在P操作等待s变成非零,那么V操作会重启这些线程中的一个,然后该线程将s减1,完成它的P操作

P操作中的测试和减1是不可分割的,V操作中的加1操作也是不可分割的,即这个过程不能被中断。
V操作必须只能重启一个正在等待的线程,当有多个线程在等待同一个信号量时,不能预测V操作要重启哪一个线程。
信号量不变性 :一个正在运行的程序,正确初始化了的信号量不可能取负值。
Posix定义了许多操作信号量的函数:

#include <semaphore.h>  
// 将信号量sem初始化为value,每个信号量在使用之前必须初始化  
int sem_init(sem_t *sem, 0, unsigned int value);  
int sem_wait(set_t *s);   /* P(s) */  
int sem_post(sem_t *s);   /* V(s) */  

#include "csapp.h"  
void P(sem_t *s);   // Wrapper function for sem_wait  
void V(sem_t *s);   // Wrapper function for sem_post  

对共享变量的互斥访问:将每个共享变量与一个信号量s(初始化为1)联系起来,然后用P(s)和V(s)操作将相应的临界区包围起来。
二元信号量:信号量的值总是0或1。
提供互斥为目的的二元信号量称为 互斥锁 ,在一个互斥锁上执行P操作称为对互斥锁加锁,在一个互斥锁上执行V操作称为互斥锁解锁。

信号量的另一个重要作用是调度对共享资源的访问,一个线程用信号量操作来通知另一个线程程序状态中的某个条件已经为真了。

生产者-消费者问题

生产者和消费者线程共享一个有n个槽的有限缓冲区,生产者线程反复生成新的项目并把它们插入到缓冲区中,消费者线程不断从缓冲区中取出这些项目,然后使用它们。也可能有多个生产者和多个消费者。

  • 插入和取出项目都涉及更新共享变量,必须保证对缓冲区的访问是互斥的。
  • 如果缓冲区是满的,那么生产者必须等待直到有一个槽位变为可用。
  • 如果缓冲区是空的,那么消费者必须等待直到有一个项目变为可用。

为了构造生产者和消费者程序,建立sbuf_t结构的缓冲区,buf是一个动态分配的n项整数数组,front和rear索引值记录该数组中的第一项和最后一项,三个信号量用来同步对缓冲区的访问。

// sbuf.h  
#include "csapp.h"  
typedef struct  
{  
    int *buf;    /* Buffer array */  
    int n;       /* Maximum number of slots */  
    int front;   /* buf[(front + 1) % n] is first item */  
    int rear;    /* buf[rear % n] is last item */  
    sem_t mutex; /* Protects accesses to buf */  
    sem_t slots; /* Counts available slots */  
    sem_t items; /* Counts available items */  
}sbuf_t;  
void sbuf_init(sbuf_t *sp, int n);  
void sbuf_deinit(sbuf_t *sp);  
void sbuf_insert(sbuf_t *sp, int item);  
int sbuf_remove(sbuf_t *sp);  

使用sbuf_init函数初始化缓冲区,sbuf_deinit函数用来释放缓冲区,sbuf_insert用来向缓冲区加入一个项目,sbuf_remove函数用来从缓冲区中取出一个项目。

#include "csapp.h"  
#include "sbuf.h"  
/* Create an empty, bounded, shared FIFO buffer with n slots */  
void sbuf_init(sbuf_t *sp, int n)  

{  
    sp->buf = Calloc(n, sizeof(int));  
    sp->n = n;                  /* Buffer holds max of n items */  
    sp->front = sp->rear = 0;   /* Empty buffer if front == rear */  
    Sem_init(&sp->mutex, 0, 1); /* Binary semaphore for locking */  
    Sem_init(&sp->slots, 0, n); /* Initially, buf has n empty slots */  
    Sem_init(&sp->items, 0, 0); /* Initiallt, buf has zero data items */  
}  

/* Clean up buffer sp */  
void sbuf_deinit(sbuf_t *sp)  
{  
    Free(sp->buf);  
}  

/* Insert item onto the rear of shared buffer sp */  
void sbuf_insert(sbuf_t *sp, int item)  
{  
    P(&sp->slots);                        /* Wait for available slot */  
    P(&sp->mutex);                        /* Lock the buffer */  
    sp->buf[(++sp->rear)%(sp->n)] = item; /* Insert the item */  
    V(&sp->mutex);                        /* Unlock the buffer */  
    V(&sp->items);                        /* Announce available item */  
}  

/* Remove and return the first item from buffer sp */  
int sbuf_remove(sbuf_t *sp)  
{  
    P(&sp->items);                             /* Wait for available item */  
    P(&sp->mutex);                             /* Lock the buffer */  
    int item = sp->buf[(++sp->front)%(sp->n)]; /* Remove the item */  
    V(&sp->mutex);                             /* Unlock the buffer */  
    V(&sp->slots);                             /* Announce available slot */  
    return item;  
}  

读者-写者问题

有些线程只读对象,而其他的线程只修改对象。修改对象的线程叫写者,只读对象的线程叫做读者。
写着必须拥有对对象的独占访问,而读者可以和其他无限多个读者共享对象。
第一类读者写者问题,读者优先,要求不要让读者等待,除非已经把使用对象的权限赋予给了一个写着。
第二类读者写者问题,写者优先,要求一旦写者准备好写就会尽快地完成写操作,一个写者到达后读者必须等待。
这两种读者写者问题都可能 导致饥饿 ,即一个线程无限期地阻塞,无法进展。
第一类读者写者问题的一个方案如下:

// Global variables  
int readcnt;   // Initially = 0  
sem_t mutex, w;  // Both initially = 1  

void reader()  
{  
    P(&mutex);  
    ++readcnt;  
    if(readcnt == 1)   // First in  
        P(&w);  
    V(&mutex);  
    // Reading happens  
    P(&mutex)  
    --readcnt;  
    if(raedcnt == 0)   // Last out  
        V(&w);  
    V(&mutex);  
}  

void writer()  
{  
    P(&w);  
    // Writing happens ...  
    V(&w);  
}  

预线程化地并发服务器

为每一个新客户端创建一个线程导致不小的代价,预线程化的服务器试图使用生产者-消费者模型降低这种开销,服务器由一个主线程和一组工作者线程构成。主线程不断接受来自客户端的连接请求,并将得到的连接描述符放在一个有限缓冲区中,每一个工作者线程反复从共享缓冲区中取出描述符,为客户端服务,然后等待下一个描述符。

// 预线程化的并发服务器  
#include "csapp.h"  
#include "sbuf.h"  
#define NTHREADS 4  
#define SBUFSIZE 16  

void echo_cnt(int connfd);  
void *thread(void *vargp);  

sbuf_t sbuf;   /* Shared buffer of connected descriptors */  

int main(int argc, char **argv)  
{  
    int i, listenfd, connfd;  
    socklen_t clientlen;  
    struct sockaddr_storage clientaddr;  
    pthread_t tid;  

    if(argc != 2)  
    {  
        fprintf(stderr, "usage: %s <port>\n", argv[0]);  
        exit(0);  
    }  
    listenfd = Open_listenfd(argv[1]);  

    sbuf_init(&sbuf, SBUFSIZE);  
    for(i = 0; i < NTHREADS; ++i)    /* Create worker threads */  
        Pthread_create(&tid, NULL, thread, NULL);  

    while(1)  
    {  
        clientlen = sizeof(struct sockaddr_storage);  
        connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen);  
        sbuf_insert(&sbuf, connfd);  /* Insert connfd in buffer */  
    }  
}  

void *thread(void *vargp)  
{  
    Pthread_detach(pthread_self());  
    while(1)  
    {  
        int connfd = sbuf_remove(&sbuf);  /* Remove connfd from buffer */  
        echo_cnt(connfd);                 /* Service client */  
        Close(connfd);  
    }  
}  

其中函数echo_cnt用来服务客户端,记录从客户端接收到的累计字节数并打印出来,同时其中展示了一个线程例程调用初始化程序包的技术。

// echo-cnt.c  
#include "csapp.h"  
static int byte_cnt;   /* Byte counter */  
static sem_t mutex;    /* and the mutex that protects it */  

static void init_echo_cnt()  
{  
    Sem_init(&mutex, 0, 1);  
    byte_cnt = 0;  
}  

void echo_cnt(int connfd)  
{  
    int n;  
    char buf[MAXLINE];  
    rio_t rio;  
    static pthread_once_t once = PTHREAD_ONCE_INIT;  

    Pthread_once(&once, init_echo_cnt);  
    Rio_readinitb(&rio, connfd);  

    while((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0)  
    {  
        P(&mutex);  
        byte_cnt += n;  
        printf("server received %d (%d total bytes) on fd %d\n", n, byte_cnt, connfd);  
        V(&mutex);  
        Rio_writen(connfd, buf, n);  
    }  
}  

使用线程提高并行性


大多数现代机器都具有多核处理器,并发程序在这样的机器上运行得更快,因为操作系统内核在多个核上并行地调度这些并发线程。
并发程序有多条并发流,并行程序是一个运行在多个处理器上的并发程序。
并行编程中, 同步开销巨大 ,要尽可能避免,如果无法避免,必须要用尽可能多的有用计算弥补这个开销。
由于在一个核上多个线程上下文切换的开销,并行程序通常被写为每个核上只运行一个线程。

其他并发问题


一个函数被称为线程安全的,当且仅当被多个并发线程反复地调用时,会一直产生正确的结果。
线程不安全函数有四类:

  1. 不保护共享变量的函数。这一类函数可以利用向P和V这样的同步操作来保护共享变量,但是同步操作将减慢程序的执行时间。
  2. 保持跨越多个调用的状态的函数。如rand函数是线程不安全的,因为当前调用的结果依赖于前次调用的中间结果。这类函数只能重写,使得它不再使用任何static数据,而是依靠调用者在参数中传递状态信息。
  3. 返回指向静态变量的指针的函数。这种函数的结果在被一个线程使用时,可能会被另一个线程悄悄覆盖。这类函数也可以进行重写,或者使用加锁-复制技术。
  4. 调用线程不安全函数的函数。

可重入函数属于线程安全函数,且当它们被多个线程调用时,不会引用任何共享数据。
可重入函数通常比不可重入的线程安全的函数更高效一些,因为它们不需要同步操作。

认识到可重入性有时既是调用者也是被调用者的属性,并不只是被调用者单独的属性是非常重要的。

当一个程序的正确性依赖于一个线程要在另一个线程到达y点之前到达它的控制流中的x点时,就会发生 竞争
多线程的程序必须对任何可行的轨迹都正确工作

死锁(deadlock) 指的是一组线程被阻塞了,等待一个永远也不会为真的条件。
程序死锁是因为每个线程都在等待其他线程执行一个根本不可能发生的V操作。
使用互斥锁加锁顺序规则可以避免死锁: 给定所有互斥操作的一个全序,如果每个线程都是以同一种顺序获得互斥锁并以相反的顺序释放,那么这个程序就是无死锁的

评论

还没有登陆?评论请先登陆注册

还没有评论,抢个沙发吧!

 联系方式 contact me

Github
Email
QQ
Weibo