如果逻辑控制流在时间上重叠,那么它们就是 并发的(concurrent) 。现代操作系统提供三种基本的构造并发程序的方法:
使用进程构造并发程序,主要使用fork
、exec
和waitpid
这些函数。
构造一个并发服务器,可以在父进程中接受客户端连接请求,然后创建一个新的子进程为每个客户端提供服务。
进程共享文件表,但是不共享用户地址空间。
进程有独立的地址空间,一个进程不可能不小心覆盖另一个进程的虚拟内存,但使得进程共享状态信息更加困难。
基于进程的并发往往比较慢,因为进程控制和IPC开销很高。
// 基于进程的并发服务器 #include "csapp.h" void echo(int connfd); void sigchld_handler(int sig) { while(waitpid(-1, 0, WNOHANG) > 0) {} return; } int main(int argc, char **argv) { int listenfd, connfd; socklen_t clientlen; struct sockaddr_storage clientaddr; /* Enough space for any address */ char client_hostname[MAXLINE], client_port[MAXLINE]; if(argc != 2) { fprintf(stderr, "usage: %s <port>\n", argv[0]); exit(0); } Signal(SIGCHLD, sigchld_handler); listenfd = Open_listenfd(argv[1]); while(1) { clientlen = sizeof(struct sockaddr_storage); connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen); if(Fork() == 0) { Close(listenfd); /* Child closes its listening socket */ Getnameinfo((SA *) &clientaddr, clientlen, client_hostname, MAXLINE, client_port, MAXLINE, 0); printf("Connected to (%s, %s)\n", client_hostname, client_port); echo(connfd); /* Child services client */ Close(connfd); /* Child closes connection with client */ exit(0); /* Child exits */ } Close(connfd); /* Parent closes connected socket (important!) */ } exit(0); }
I/O多路复用技术的基本思路是 使用select函数,要求内核挂起进程,只有在一个或多个I/O事件发生后,才将控制返回给应用程序。
select
是一个复杂的函数,这里只讨论等待一组描述符准备好读的情况。
select
处理类型为fd_set的集合,也叫做描述符集合,描述符集合可以看成一个大小为n的位向量,每个位bk 对应于描述符k,当且仅当bk=1,描述符k才表明是描述符集合的一个元素。对描述符集合,只可以进行分配、同类赋值和使用FD_ZERO
、FD_CLR
、FD_SET
、FD_ISSET
修改和检查它们。
#include <sys/select.h> // 返回已准备好的非零描述符的个数,若出错则为-1 // fdset表示读集合的描述符集合 // n表示读集合的基数,实际上是任何描述符集合的最大基数 // select函数会一直阻塞直到读集合中至少有一个描述符准备好可以读 // 当前仅当一个从该描述符读取一个字节的请求不会阻塞时,描述符k就准备好可以读了 // select会修改fdsse指向的fd_set,指明读集合的一个子集,称为准备好集合,由准备好可以读的描述符组成 // 因此每次调用select前都要更新读集合 int select(int n, fd_set *fdset, NULL, NULL, NULL); FD_ZERO(fd_set *fdset); // Clear all bits in fdset FD_CLR(int fd, fd_set *fdset); // Clear bit fd in fdset FD_SET(int fd, fd_set *fdset); // Turn on bit fd in fdset FD_ISSET(iny fd, fd_SET *fdset); // Is bit fd in fdset on?
I/O多路复用可以作为并发事件驱动程序的基础,在事件驱动程序中,某些事件会导致流向前推进。
服务器使用I/O多路复用,借助select
函数检测输入事件的发生,当每个已连接描述符准备好可读时,服务器就为相应的状态机执行转移,这里即为从描述符读和写回一个文本行。
// 基于I/O多路复用的并发事件驱动服务器 #include "csapp.h" typedef struct /* Represents a pool of connected descriptors */ { int maxfd; /* Largest descriptor in read_set */ fd_set read_set; /* Set of all active descriptors */ fd_set ready_set;/* Subset of descriptors ready for reading */ int nready; /* Number of ready descriptors from select */ int maxi; /* High water index into client array */ int clientfd[FD_SETSIZE]; /* Set of active descriptors */ rio_t clientrio[FD_SETSIZE];/* Set of active read buffers */ }pool; int byte_cnt = 0; /* Counts total bytes received by server */ // 初始化客户端池 void init_pool(int listenfd, pool *p) { /* Initially, there are no connected descriptors */ int i; p->maxi = -1; for(i = 0; i < FD_SETSIZE; ++i) p->clientfd[i] = -1; /* Initially, listenfd is only member of select read set */ p->maxfd = listenfd; FD_ZERO(&p->read_set); FD_SET(listenfd, &p->read_set); } // 添加一个新的客户端到活动客户端池中 void add_client(int connfd, pool *p) { int i; --p->nready; for(i = 0; i < FD_SETSIZE; ++i) /* Find an available slot */ { if(p->clientfd[i] < 0) { /* Add connected descriptor to the pool */ p->clientfd[i] = connfd; Rio_readinitb(&p->clientrio[i], connfd); /* Add the descriptor to descriptor set */ FD_SET(connfd, &p->read_set); /* Update max descriptor and pool high water mark */ if(connfd > p->maxfd) p->maxfd = connfd; if(i > p->maxi) p->maxi = i; break; } } if(i == FD_SETSIZE) /* Couldn't find an empty slot */ app_error("add_client error: Too many clients"); } // 回送来自每个准备好的已连接描述符的一个文本行 void check_clients(pool *p) { int i, connfd, n; char buf[MAXLINE]; rio_t rio; for(i = 0; (i <= p->maxi) && (p->nready > 0); ++i) { connfd = p->clientfd[i]; rio = p->clientrio[i]; /* If the descriptor is ready, echo a text line from it */ if((connfd > 0) && (FD_ISSET(connfd, &p->ready_set))) { --p->nready; if((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) { byte_cnt += n; printf("Server reveived %d (%d total) bytes on fd %d\n", n, byte_cnt, connfd); Rio_writen(connfd, buf, n); } else /* EOF detected, remove descriptor from pool */ { Close(connfd); FD_CLR(connfd, &p->read_set); p->clientfd[i] = -1; } } } } int main(int argc, char **argv) { int listenfd, connfd; socklen_t clientlen; struct sockaddr_storage clientaddr; static pool pool; if(argc != 2) { fprintf(stderr, "usage: %s <port>\n", argv[0]); exit(0); } listenfd = Open_listenfd(argv[1]); init_pool(listenfd, &pool); while(1) { /* Wait for listening/connected descriptor(s) to become ready */ pool.ready_set = pool.read_set; pool.nready = Select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL); /* If listening descriptor ready, add new client to pool */ if(FD_ISSET(listenfd, &pool.ready_set)) { clientlen = sizeof(struct sockaddr_storage); connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen); add_client(connfd, &pool); } /* Echo a text line from each ready connected descriptor */ check_clients(&pool); } exit(0); }
线程(thread)就是 运行在进程上下文中的逻辑流 。线程由内核自动调度并由内核通过一个整数ID来识别线程,每个线程都有它自己的线程上下文,包括一个唯一的整数线程ID(TID)、栈、栈指针、程序计数器、通用目的寄存器和条件码,所有运行在一个进程里的线程共享该进程的整个虚拟地址空间。
每个进程开始生命周期时都是单一线程,这个线程称为 主线程 。某一时刻,主线程创建一个 对等线程 ,此刻之后两个线程就并发地运行,当主线程执行一个慢速系统调用,控制就会通过上下文切换传递到对等线程,对等线程会执行一段时间,然后控制传递回主线程,依次类推。
Posix线程(Pthreads) 是C程序中处理线程的一个标准接口,允许程序创建、杀死和回收线程,与对等线程安全地共享数据,还可以通知对等线程系统状态的变化。
线程的代码和本地数据通常被封装在一个 线程例程 中。每个线程例程都以一个通用指针作为输入,并返回一个通用指针。如果想要传递多个参数给线程例程,那么应该将参数放到一个结构中,并传递一个指向该结构的指针。
线程通过pthread_create
函数来创建其他线程。
pthread_create
函数创建一个新的线程,并带着一个输入变量arg,在新线程的上下文中运行线程例程f。
使用attr参数来改变创建线程的默认属性,示例中attr属性常设为NULL,这些属性需要进一步的深入学习。
pthread_create
函数返回时,参数tid包含新创建线程的ID。
新线程可以通过调用pthread_self
函数获得它自己的线程ID。
#include <pthread.h> typedef void *(func)(void *); // 成功则返回0,出错返回非零 int pthread_create(pthread_t *tid, pthread_attr_t *attr, func *f, void *argp); // 返回调用者的线程ID pthread_t pthread_self();
pthread_exit
函数,线程会显式地终止,主线程调用pthread_exit
会等待所有其他对等线程终止,然后再终止主线程和整个线程,返回值为thread_return pthread_cancel
函数终止当前线程 #include <pthread.h> void pthread_exit(void *thread_return); // 成功返回0, 出错返回非零 int pthread_cancel(pthread_t tid);
线程通过调用pthread_join
函数等待其他线程终止。
pthread_join
函数会阻塞直到tid终止,将线程例程返回的通用(void *)指针赋值为thread_return指向的位置,然后回收已终止线程占用的所有内存资源。
pthread_join
函数只能等待一个指定的线程终止,不能等待任意一个线程终止。
#include <pthread.h> // 成功返回0, 出错返回非零 int pthread_join(pthread_t tid, void **thread_return);
默认情况,线程被创建成可结合的或者是分离的。为了避免内存泄漏,每个可结合线程都应该要么被其他线程显式地收回,要么通过调用pthread_detach
函数被分离。
#include <pthread.h> // 分离可结合线程tid // 线程通过以pthread_self()为参数调用来分离自己 int pthread_detach(pthread_t tid);
pthread_once
函数允许初始化与线程例程相关的状态,需要动态初始化多个线程共享的全局变量时,这个函数很有用。
#include <pthread.h> // once_control是一个全局或者静态变量,总是被初始化为PTHREAD_ONCE_INIT pthread_once_t once_control = PTHREAD_ONCE_INIT; // 第一次以once_control调用pthread_once时,调用init_routine // 接下来再调用时将不做任何事 int pthread_once(pthread_once_t *once_control, void (*init_routine)(void));
基于线程的并发服务器,主线程不断等待连接请求,然后创建一个对等线程处理该请求。
传递已连接描述符给对等线程时,传递指向这个描述符的指针,为了避免竞争,这个指针指向一个动态分配的内存块。
// 基于线程的并发服务器 #include "csapp.h" void echo(int connfd); void *thread(void *vargp); int main(int argc, char **argv) { int listenfd, *connfdp; socklen_t clientlen; struct sockaddr_storage clientaddr; pthread_t tid; if(argc != 2) { fprintf(stderr, "usage: %s <port>\n", argv[0]); exit(0); } listenfd = Open_listenfd(argv[1]); while(1) { clientlen = sizeof(struct sockaddr_storage); connfdp = Malloc(sizeof(int)); *connfdp = Accept(listenfd, (SA *)&clientaddr, &clientlen); Pthread_create(&tid, NULL, thread, connfdp); } } /* Thread routine */ void *thread(void *vargp) { int connfd = *((int *)vargp); Pthread_detach(pthread_self()); Free(vargp); echo(connfd); Close(connfd); return NULL; }
一个变量v是共享的,当且仅当它的一个实例被一个以上的线程引用。
没有办法预测操作系统是否将为线程选择一个正确的顺序。
操作共享变量的指令构成临界区,要确保在执行临界区中的指令时,拥有对共享变量的互斥访问。为了保证线程化程序的正确执行,必须以某种方式 同步 线程(协同步调,按规定的次序先后执行)。
P操作中的测试和减1是不可分割的,V操作中的加1操作也是不可分割的,即这个过程不能被中断。
V操作必须只能重启一个正在等待的线程,当有多个线程在等待同一个信号量时,不能预测V操作要重启哪一个线程。
信号量不变性 :一个正在运行的程序,正确初始化了的信号量不可能取负值。
Posix定义了许多操作信号量的函数:
#include <semaphore.h> // 将信号量sem初始化为value,每个信号量在使用之前必须初始化 int sem_init(sem_t *sem, 0, unsigned int value); int sem_wait(set_t *s); /* P(s) */ int sem_post(sem_t *s); /* V(s) */ #include "csapp.h" void P(sem_t *s); // Wrapper function for sem_wait void V(sem_t *s); // Wrapper function for sem_post
对共享变量的互斥访问:将每个共享变量与一个信号量s(初始化为1)联系起来,然后用P(s)和V(s)操作将相应的临界区包围起来。
二元信号量:信号量的值总是0或1。
提供互斥为目的的二元信号量称为 互斥锁 ,在一个互斥锁上执行P操作称为对互斥锁加锁,在一个互斥锁上执行V操作称为互斥锁解锁。
信号量的另一个重要作用是调度对共享资源的访问,一个线程用信号量操作来通知另一个线程程序状态中的某个条件已经为真了。
生产者和消费者线程共享一个有n个槽的有限缓冲区,生产者线程反复生成新的项目并把它们插入到缓冲区中,消费者线程不断从缓冲区中取出这些项目,然后使用它们。也可能有多个生产者和多个消费者。
为了构造生产者和消费者程序,建立sbuf_t结构的缓冲区,buf是一个动态分配的n项整数数组,front和rear索引值记录该数组中的第一项和最后一项,三个信号量用来同步对缓冲区的访问。
// sbuf.h #include "csapp.h" typedef struct { int *buf; /* Buffer array */ int n; /* Maximum number of slots */ int front; /* buf[(front + 1) % n] is first item */ int rear; /* buf[rear % n] is last item */ sem_t mutex; /* Protects accesses to buf */ sem_t slots; /* Counts available slots */ sem_t items; /* Counts available items */ }sbuf_t; void sbuf_init(sbuf_t *sp, int n); void sbuf_deinit(sbuf_t *sp); void sbuf_insert(sbuf_t *sp, int item); int sbuf_remove(sbuf_t *sp);
使用sbuf_init
函数初始化缓冲区,sbuf_deinit
函数用来释放缓冲区,sbuf_insert
用来向缓冲区加入一个项目,sbuf_remove
函数用来从缓冲区中取出一个项目。
#include "csapp.h" #include "sbuf.h" /* Create an empty, bounded, shared FIFO buffer with n slots */ void sbuf_init(sbuf_t *sp, int n) { sp->buf = Calloc(n, sizeof(int)); sp->n = n; /* Buffer holds max of n items */ sp->front = sp->rear = 0; /* Empty buffer if front == rear */ Sem_init(&sp->mutex, 0, 1); /* Binary semaphore for locking */ Sem_init(&sp->slots, 0, n); /* Initially, buf has n empty slots */ Sem_init(&sp->items, 0, 0); /* Initiallt, buf has zero data items */ } /* Clean up buffer sp */ void sbuf_deinit(sbuf_t *sp) { Free(sp->buf); } /* Insert item onto the rear of shared buffer sp */ void sbuf_insert(sbuf_t *sp, int item) { P(&sp->slots); /* Wait for available slot */ P(&sp->mutex); /* Lock the buffer */ sp->buf[(++sp->rear)%(sp->n)] = item; /* Insert the item */ V(&sp->mutex); /* Unlock the buffer */ V(&sp->items); /* Announce available item */ } /* Remove and return the first item from buffer sp */ int sbuf_remove(sbuf_t *sp) { P(&sp->items); /* Wait for available item */ P(&sp->mutex); /* Lock the buffer */ int item = sp->buf[(++sp->front)%(sp->n)]; /* Remove the item */ V(&sp->mutex); /* Unlock the buffer */ V(&sp->slots); /* Announce available slot */ return item; }
有些线程只读对象,而其他的线程只修改对象。修改对象的线程叫写者,只读对象的线程叫做读者。
写着必须拥有对对象的独占访问,而读者可以和其他无限多个读者共享对象。
第一类读者写者问题,读者优先,要求不要让读者等待,除非已经把使用对象的权限赋予给了一个写着。
第二类读者写者问题,写者优先,要求一旦写者准备好写就会尽快地完成写操作,一个写者到达后读者必须等待。
这两种读者写者问题都可能 导致饥饿 ,即一个线程无限期地阻塞,无法进展。
第一类读者写者问题的一个方案如下:
// Global variables int readcnt; // Initially = 0 sem_t mutex, w; // Both initially = 1 void reader() { P(&mutex); ++readcnt; if(readcnt == 1) // First in P(&w); V(&mutex); // Reading happens P(&mutex) --readcnt; if(raedcnt == 0) // Last out V(&w); V(&mutex); } void writer() { P(&w); // Writing happens ... V(&w); }
为每一个新客户端创建一个线程导致不小的代价,预线程化的服务器试图使用生产者-消费者模型降低这种开销,服务器由一个主线程和一组工作者线程构成。主线程不断接受来自客户端的连接请求,并将得到的连接描述符放在一个有限缓冲区中,每一个工作者线程反复从共享缓冲区中取出描述符,为客户端服务,然后等待下一个描述符。
// 预线程化的并发服务器 #include "csapp.h" #include "sbuf.h" #define NTHREADS 4 #define SBUFSIZE 16 void echo_cnt(int connfd); void *thread(void *vargp); sbuf_t sbuf; /* Shared buffer of connected descriptors */ int main(int argc, char **argv) { int i, listenfd, connfd; socklen_t clientlen; struct sockaddr_storage clientaddr; pthread_t tid; if(argc != 2) { fprintf(stderr, "usage: %s <port>\n", argv[0]); exit(0); } listenfd = Open_listenfd(argv[1]); sbuf_init(&sbuf, SBUFSIZE); for(i = 0; i < NTHREADS; ++i) /* Create worker threads */ Pthread_create(&tid, NULL, thread, NULL); while(1) { clientlen = sizeof(struct sockaddr_storage); connfd = Accept(listenfd, (SA *)&clientaddr, &clientlen); sbuf_insert(&sbuf, connfd); /* Insert connfd in buffer */ } } void *thread(void *vargp) { Pthread_detach(pthread_self()); while(1) { int connfd = sbuf_remove(&sbuf); /* Remove connfd from buffer */ echo_cnt(connfd); /* Service client */ Close(connfd); } }
其中函数echo_cnt
用来服务客户端,记录从客户端接收到的累计字节数并打印出来,同时其中展示了一个线程例程调用初始化程序包的技术。
// echo-cnt.c #include "csapp.h" static int byte_cnt; /* Byte counter */ static sem_t mutex; /* and the mutex that protects it */ static void init_echo_cnt() { Sem_init(&mutex, 0, 1); byte_cnt = 0; } void echo_cnt(int connfd) { int n; char buf[MAXLINE]; rio_t rio; static pthread_once_t once = PTHREAD_ONCE_INIT; Pthread_once(&once, init_echo_cnt); Rio_readinitb(&rio, connfd); while((n = Rio_readlineb(&rio, buf, MAXLINE)) != 0) { P(&mutex); byte_cnt += n; printf("server received %d (%d total bytes) on fd %d\n", n, byte_cnt, connfd); V(&mutex); Rio_writen(connfd, buf, n); } }
大多数现代机器都具有多核处理器,并发程序在这样的机器上运行得更快,因为操作系统内核在多个核上并行地调度这些并发线程。
并发程序有多条并发流,并行程序是一个运行在多个处理器上的并发程序。
并行编程中, 同步开销巨大 ,要尽可能避免,如果无法避免,必须要用尽可能多的有用计算弥补这个开销。
由于在一个核上多个线程上下文切换的开销,并行程序通常被写为每个核上只运行一个线程。
一个函数被称为线程安全的,当且仅当被多个并发线程反复地调用时,会一直产生正确的结果。
线程不安全函数有四类:
rand
函数是线程不安全的,因为当前调用的结果依赖于前次调用的中间结果。这类函数只能重写,使得它不再使用任何static数据,而是依靠调用者在参数中传递状态信息。 可重入函数属于线程安全函数,且当它们被多个线程调用时,不会引用任何共享数据。
可重入函数通常比不可重入的线程安全的函数更高效一些,因为它们不需要同步操作。
认识到可重入性有时既是调用者也是被调用者的属性,并不只是被调用者单独的属性是非常重要的。
当一个程序的正确性依赖于一个线程要在另一个线程到达y点之前到达它的控制流中的x点时,就会发生 竞争 。
多线程的程序必须对任何可行的轨迹都正确工作 。
死锁(deadlock) 指的是一组线程被阻塞了,等待一个永远也不会为真的条件。
程序死锁是因为每个线程都在等待其他线程执行一个根本不可能发生的V操作。
使用互斥锁加锁顺序规则可以避免死锁: 给定所有互斥操作的一个全序,如果每个线程都是以同一种顺序获得互斥锁并以相反的顺序释放,那么这个程序就是无死锁的 。
a
--
123456789
更改id为3
--
test
更改id为2
--
commentor
伪造名称???
--
hhh
伪造名称???
--
yayay