网页建设网站代码,多产品网站怎么做企业网站,四平网站建设有哪些,手机在线ps照片处理1.管道
2.信号量
2.1 概念
信号量 是一个计数器#xff0c;用于实现进程间互斥和同步。
信号量的取值可以是任何自然数。 最简单的信号量是只能取 0 和 1 的变量#xff0c;这也是信号量最常见的一种形式#xff0c;叫做二进制信号量#xff08;Binary Semaphore#…1.管道
2.信号量
2.1 概念
信号量 是一个计数器用于实现进程间互斥和同步。
信号量的取值可以是任何自然数。 最简单的信号量是只能取 0 和 1 的变量这也是信号量最常见的一种形式叫做二进制信号量Binary Semaphore。 而可以取多个正整数的信号量被称为通用信号量。
2.2 特点
1. 信号量用于进程间同步若要在进程间传递数据需要结合共享内存。2. 信号量基于操作系统的 PV 操作程序对信号量的操作都是 原子操作。3. 每次对信号量的 PV 操作不仅限于对信号量值加 1 或减 1而且可以加减任意正整数。4. 支持信号量组。2.3 原型
2.3.1 semget 创建或获取一个信号量集函数
#include sys/sem.h// 创建或获取一个信号量集 若成功返回信号量集ID失败返回-1
int semget(key_t key, int num_sems, int sem_flags);key:一个键值用来标识一个全局唯一的信号量集。要通过信号量通信的进程使用相同的 key 来创建或获取该信号量num_sems指定要 创建/获取信号量集中信号量的数目。创建该值必须被指定。获取可以设置为 0.sem_flags指定一组标志。它低端 9 个比特是信号量的权限其格式和含义都与系统调用的 open、model参数相同。此外它还可以和 IPC_CREAT 标志做按位 “或”运算以创建新的信号量集。确保创建一组新的、唯一的信号量集联合使用 IPC_CREAT 和 IPC_EXCL.此时若创建的信号量集已存在则 semget 返回错误 并设置 errno 为 EEXIST.返回成功返回正整数值是信号量级的标识符失败-1并设置 errno扩展若 semget 用于创建信号量集则与之关联的内核数据结构体 semid_ds 将被创建并初始化。semid_ds结构体定义如下#include sys/sem,h //此结构体用于描述 IPC 对象信号量、共享内存和消息队列的权限struct ipc_perm{key_t key; //键值uid_t uid; //所有者的有效用户 IDgid_t gid; //所有者的有效组 IDuid_t cuid; //创建者的有效用户 IDgid_t cgid; //创建者的有效组 IDmode_t mode; //访问权限... //省略其他字段}struct semid_ds{struct ipc_perm sem_perm; //信号量的操作权限unsigned long int sem_nsems; //该信号量集中的信号量数目time_t sem_otime; //最后一次调用 semop 的时间time_t sem_ctime; //最后一次调用 semctl 的时间... //省略其他填充字段}semget 对 semid_ds 结构体初始化包括sem_perm.cuid 和 sem_perm.uid 设置为调用进程的有效用户 IDsem_perm.cgid 和 sem_perm.gid 设置为调用进程的有效组 IDsem_perm.mode 的最低 9 位设置为 sem_flags 参数的最低 9 位sem_nsems 设置为 num_semssem_otime 设置为 0sem_ctime 设置为当前系统时间2.3.1.1 semget 特殊参数 IPC_PRIVATE
可以给 semget 函数传一个特殊的键值 IPC_PRIVATE其值为 0这样无论信号量是否已存在 semget 都将创建一个新的信号量。注意不要因为这个名字就认为这个创建的信号量就是私有的
其他进程尤其是子进程也有方法来访问这个信号量。如下例父、子进程间使用一个 IPC_PRIVATE 信号量来同步#include sys/sem.h
#include stdio.h
#include stdlib.h
#include unistd.h
#include sys/wait.h
#include time.hunion semun
{int val; struct semid_ds* buf; unsigned short int* array;struct seminfo* __buf;
};// op-1 执行 Pop1 执行 V
void pv( int sem_id, int op )
{struct sembuf sem_b;sem_b.sem_num 0;sem_b.sem_op op;sem_b.sem_flg SEM_UNDO;semop( sem_id, sem_b, 1 );
}int main( int argc, char* argv[] )
{int sem_id semget( IPC_PRIVATE, 1, 0666 ); //创建一个信号量集 权限设置为 0666union semun sem_un;sem_un.val 1; semctl( sem_id, 0, SETVAL, sem_un ); //设置创建的信号量集 信号量 的值 为 1pid_t id fork();if( id 0 ){return 1;}else if( id 0 ){time_t cur time( NULL );printf( child try to get binary sem: %s\n ,ctime(cur));//在父子进程间共享 IPC_PRIVATE 信号量的关键在于二者都可以操作该信号量的标识符 sem_idpv( sem_id, -1 ); //Vprintf( child get the sem and would release it after 5 seconds\n );sleep( 5 );pv( sem_id, 1 ); //Ptime_t cur1 time( NULL );printf( child try to get binary sem: %s\n ,ctime(cur1));exit( 0 );}else{time_t cur time( NULL );printf( parent try to get binary sem:%s\n ,ctime(cur));pv( sem_id, -1 ); //Vprintf( parent get the sem and would release it after 5 seconds\n );sleep( 5 );time_t cur1 time( NULL );printf( child try to get binary sem: %s\n ,ctime(cur1));pv( sem_id, 1 ); //P}waitpid( id, NULL, 0 );semctl( sem_id, 0, IPC_RMID, sem_un ); //立即移除信号量集return 0;
}2.3.2 semop 对信号量集操作函数
与信号量关联的一些重要内核变量
unsigned short semval; //信号量的值
unsigned short semzcnt; //等待信号量值变为 0 的进程数量
unsigned short semncnt; //等待信号量值增加的进程数量
pid_t sempid; //最后一次执行 semop 操作的进程 ID// 对信号量集进行操作就是改变上面的内核变量改变信号量的值即执行 P、V 操作
#include sys/sem.h
int semop(int sem_id, struct sembuf * sem_ops, size_t num_sem_ops);sem_id:由 semget 调用返回的信号量集标识符用以指定被操作的目标信号量集。
sem_ops指向一个 sembuf 结构体类型的数组struct sembug{unsigned short int sem_num;short int sem_op;short int sem_flg;} sem_num:信号量集中信号量的编号0表示信号量中第一个信号量。sem_op指定操作类型 可选值为 正整数、0 和 负整数。每种类型的操作行为受 参数 sem_flg 影响。sem_flg:可选值 IPC_NOWAIT SEM_UNDO.含义IPC_NOWAIT :无论信号量操作是否成功semop 调用都将立即返回类似非阻塞 I/O。SEM_UNDO当进程退出时取消正在进行的 semop 操作。sembuf 中参数 sem_op 和 sem_flg 按如下方式影响 函数 semop 的行为1. sem_op 0:将内核信号量的值在增加 sem_op此操作要求调用进程对被操作信号量集拥有写权限。若此时设置了 SEM_UNDO 标志则系统将更新进程的 semadj 变量用以跟踪进程对信号量的修改情况2. sem_op 0:表示这是一个“等待0(wait-for-zero)” 操作此操作要求调用进程对被操作信号量集有 读 权限。1.若此时 信号量值是 0则调用立即成功返回。2.若此时 信号量值不是 0则 semop 函数 失败返回 或阻塞进程以等待信号量变为 0.在这种情况下若 sem_flg 设置了 IPC_NOWAIT semop 函数立即返回一个错误并设置 errno 为 EAGAIN.若未设置 IPC_NOWAIT 则 内核信号量值 1进程被投入睡眠直到下列 3 个条件之一 发生1.信号量的值 semval 变为 0 此时系统将该信号量的 semzcnt 值 -12.被操作信号量所在信号量集被进程移除 此时 semop 调用失败返回errno 设置为 EIDRM3.调用被信号中断此时 semop 调用失败返回 errno 被设置为 EINTR同时系统将 内核信号量 semzcnt 值 -13. sem_op 0表示对信号量值进行减操作。此操作要求调用进程对被操作信号量集拥有写权限。1.若 内核信号量的值 semval 大于或等于 sem_op 的绝对值则 将 semval - |sem_op| 操作成功。此时若设置了 SEM_UNDO 标志则系统将更新进程的 semadj 变量。2.若 内核信号量的值 semval 小于 sem_op 的绝对值则 semop 失败返回或阻塞进程以等待信号量可用。在这种情况下若 IPC_NOWAIT 标志被指定时 sempop 立即返回一个错误并设置 errno 为 EAGAIN.若 IPC_NOWAIT 标志未指定 则 内核信号量 semncnt 值 1进程被投入睡眠直到下列 3 个条件之一 发生1. 信号量 semval 的值变得 sem_op的绝对值此时系统将该信号量的 semncnt 值 -1并将 semval 减去 sem_op 的绝对值。同时如果 SEM_UNDO 标志被设置则系统更新 semadj 变量。2.被操作信号量所在的信号量集被移除此时 semop 调用失败返回 errno 被设置为 EIDRM3.调用被信号中断此时 semop 调用失败返回errno 被设置为 EINTR同时系统将该信号量的 semncnt 值 -1.num_sem_ops指定要执行的操作个数即 sem_ops 数组中元素个数。返回成功0失败-1及 errno。失败时sem_ops 数组中指定所有操作都不执行。扩展semop 函数对数组 sem_op 中每个成员按照数组顺序依次执行且该过程是原子操作不可中断的一个或者一系列操作, 也就是不会被线程调度机制打断的操作, 运行期间不会有任何的上下文切换(context switch)。2.3.3 semctl 对信号量集操作函数
#include sys/sem.h
// 控制信号量集的相关信息
int semctl(int semid, int sem_num, int command, ...);semid:由 semget 调用返回的信号量集标识符以指定被操作的信号量集。
sem_num指定被操作的信号量在信号量集中的编号。
command指定要执行的命令详见下表 13-2有的命令需要第 4 个参数
...这个参数配合 command 使用由用户自己定义但 sys/sem.h 头文件给出推荐格式如下union semun{int val; //用于 SETVAL 命令struct semid_ds * buf; //用于 IPC_STAT 和 IPC_SET 命令unsigned short * array; //用于 GETALL 和 SETALL 命令struct seminfo * __buf; //用于 IPC_INFO 命令};struct seminfo{int semmap; //Linux 内核没有使用int semmni; //系统最多可以拥有的信号量集数目int semmns; //系统最多可以拥有的信号量数目int semmnu; //Linux 内核没有使用int semmsl; //一个信号量集 最多允许包含的信号量数目int semopm; //semop 一次最多能执行的 sem_op 操作数目int semume; //Linux 内核没有使用int semusz; //sem_undo 结构体的大小int semvmx; //最大允许的信号量值//最多允许的 UNDO 次数带 SEM_UNDO 标志的 semop 操作的次数int semaem; }返回成功;返回值取决于 command 参数。失败-1 及 errno。 2.共享内存
2.1 概念
共享内存 是最高效的 IPC 机制因为 不涉及进程之间任何数据传输。
但需要辅助手段来同步进程对共享内存的访问否则会产生竟态条件。Linux 共享内存包含 4 个系统调用shmget、shmat、shmdt、shmctl。2.2 shmget 系统调用
//创建一段新的共享内存或者获取一段已经存在的共享内存#include sys/shm.h
int shmget(key_t key, size_t size, int shmflg);
key:key 是一个键值用来标识一段全局唯一的共享内存。
size指定共享内存大小。若是创建则 size 必须被指定。若是获取则 size 可被设为 0.
shmflg与信号量的 semget 系统调用的 sem_flags参数相同但 这个再多两个标志1. SHM_HUGETLB系统将使用 “大页面” 来为共享内存分配空间2. SHM_NORESERVE不为共享内存保留交换分区swap空间。这样当物理内存不足时对该共享内存执行写操作将触发 SIGSEGV 信号。
返回成功正整数值共享内存标识符。失败-1errno扩展shmget 创建共享内存时所有字节都被初始化为 0与之关联的内核数据结构 shmid_ds 将被创建并初始化struct shmid_ds{struct ipc_perm shm_perm; //共享内存的操作权限size_t shm_segsz; //共享内存大小单位是字节__time_t shm_atime; //对这段内存最后一次调用 shmat 的时间__time_t shm_dtime; //对这段内存最后一次调用 shmdt 的时间__time_t shm_ctime; //对这段内存最后一次调用 shmct 的时间__pid_t shm_cpid; //创建者 pid__pid_t shm_lpid; //最后一次执行 shmat 或 shmdt 操作的 进程 pidshmatt_t shm_nattach; //目前关联到此共享内存的进程数量... //省略一些字段}#include sys/sem,h //此结构体用于描述 IPC 对象信号量、共享内存和消息队列的权限struct ipc_perm{key_t key; //键值uid_t uid; //所有者的有效用户 IDgid_t gid; //所有者的有效组 IDuid_t cuid; //创建者的有效用户 IDgid_t cgid; //创建者的有效组 IDmode_t mode; //访问权限... //省略其他字段}shmget 对 shmid_ds 初始化包括shm_perm.cuid 和 shm_perm.uid 设置为调用进程的有效用户 IDshm_perm.cgid 和 shm_perm.gid 设置为调用进程的有效组 IDshm_perm.mode 的最低 9 位设置为 sem_flags 参数的最低 9 位shm_segzs 设置为 sizeshm_lpid、shm_nattach、shm_atime、shm_dtime 设置为 0sem_ctime 设置为当前系统时间2.3 shmat 和 shmdt 系统调用
共享内存创建完后 1.不能立即访问而是首先将它关联到进程地址空间shmat 实现 2.使用完后必须将它从进程地址空间中分离shmdt 实现
#include sys/shm.h
void shmat(int shm_id, const void * shm_addr, int shmflg);
shm_id:创建的共享内存标识。
shm_addr指定将共享内存关联到进程的哪块地址空间。最终效果受 参数 shmflg 的可选标志 SHM_RND 的影响。可能的情况1. 若 shm_addr 为 NULL,则被关联的地址由操作系统选择。推荐这样做。2. 若 shm_addr 非空 且 SHM_RND 未设置,则共享内存被关联到 addr 指定地址处。3. 若 shm_addr 非空 且 设置了 SHM_RND ,则被关联的地址是 [shm_addr -(shm_addr % SHMLBA)]。SHMLBA 含义是“段低端边界地址倍数”Segment Low Boundary Address Multiple,它必须是内存页面大小PAGE_SIZE的整数倍。SHM_RND 的含义是圆整round即将共享内存被关联的地址向下圆整到离 shm_addr 最近的 SHMLBA 的整数倍地址处。
shmflg可选标志。如下SHM_RND SHM_RDONLY:进程仅能读取共享内存的内容。若没有指定此标志则进程可以对共享内存进行读写操作这取需要创建共享内存时指定其读写权限。SHM_REMAP:如果地址 shmaddr 已经被关联到一段共享内存上则重新关联。SHM_EXEC:指定对共享内存执行权限。
返回成功返回共享内存被关联到的地址。同时将修改内核数据结构 shmid_ds 部分字段如下shm_nattach 加 1.shm_lpid 设置为调用进程的 PID。shm_atime 设置为当前的时间。失败(void*)-1errno//将 shm_addr 处的共享内存从进程分离。
int shmdt (const void * shm_addr);
返回成功0。成功将修改内核 shmid_ds 部分字段shm_attach 减 1。shm_lpid 设置为 调用进程ID。shm_dtime 设置为当前时间。失败-1errno
2.4 shmctl 系统调用
//控制共享内存某些属性
#include sys/shm.h
int shmctl(int shm_id, int command ,struct shmid_ds * buf);
shm_id:共享内存标识符。
command指定要执行的命令。所有支持命令如下表 13-3。
bufL: 配合 command 使用详见表 13-3.
返回成功返回值取决于 command 命令详见 表 13-3.失败-1errno。 2.5 POSIX 的共享内存方法
注意使用如下 POSIX 共享内存函数编译时需要指定链接选项 -lrt//创建或打开一个 POSIX 共享内存对象
#include sys/mman.h
#include sys/stat.h
#include fcntl.h
int shm_open(const char * name, int oflag, mode_t mode);
shm_open 的使用方法与 open 系统调用完全相同。
name:指定要创建/打开的共享内存对象。从可移植性考虑此参数应使用 “/somename”的格式以 “/” 开始后接多个字符且这些字符都不是 “/”以 “\0” 结尾长度不超过 NAME_MAX 通常是 255。
oflag指定创建方式。可以是下列标志中 一个 或 多个的 按位或。O_RDONLY只读方式打开共享内存对象。O_RDWR 以可读、可写方式打开共享内存对象O_CREAT 若共享内存不存在则创建之。此时 mode 参数低 9 位指定该共享内存对象的访问权限。共享内存被创建时初始长度为 0.O_EXCL和 O_CREAT 一起使用。若由 name 指定的共享内存对象已存在则 shm_open 返回错误否则就创建一个新的共享内存对象。O_TRUNC若共享内存已存在则把它截断使其长度为 0。//删除创建的共享内存
#include sys/mman.h
#include sys/stat.h
#include fcntl.h
int shm_unlink(const char * name);
name:将该参数指定的共享内存对象标记为等待删除。当使用该共享内存对象的进程都使用 ummap 将它从进程分离后系统将销毁这个共享内存对象所占据的资源。
2.6 共享内存实例
/*注意点
1.每个子进程只会往自己所处理连接所对应的那一部分读缓存中写入数据所以使用共享内存的目的是“共享读”。
因此每个子进程在使用共享内存时都无需加锁这样符合“聊天室服务器”的应用场景同时提高性能。2.程序启动时给 users 分配了足够的空间使他可以存储所有可能的客户连接的相关数据。
同时sub_process 也分配了足够的空间。
这是牺牲空间换时间的例子。
*///一个子进程处理一个客户连接
//所有客户 socket 连接的读缓冲设计为一块共享内存
#include sys/socket.h
#include netinet/in.h
#include arpa/inet.h
#include assert.h
#include stdio.h
#include unistd.h
#include errno.h
#include string.h
#include fcntl.h
#include stdlib.h
#include sys/epoll.h
#include signal.h
#include sys/wait.h
#include sys/mman.h
#include sys/stat.h
#include fcntl.h#define USER_LIMIT 5
#define BUFFER_SIZE 1024
#define FD_LIMIT 65535
#define MAX_EVENT_NUMBER 1024
#define PROCESS_LIMIT 65536//处理客户连接必要的数据
struct client_data
{sockaddr_in address; int connfd; pid_t pid; //处理这个连接的子进程 IDint pipefd[2]; //和父进程通信用的管道
};static const char* shm_name /my_shm;
int sig_pipefd[2];
int epollfd;
int listenfd;
int shmfd;
char* share_mem 0;
//客户连接数组
client_data* users 0;
//子进程和客户连接的映射关系表用进程PID来索引这个数组即可获得该进程所处理的客户连接的编号
int* sub_process 0;//当前客户数量
int user_count 0;
bool stop_child false;int setnonblocking( int fd )
{int old_option fcntl( fd, F_GETFL );int new_option old_option | O_NONBLOCK;fcntl( fd, F_SETFL, new_option );return old_option;
}void addfd( int epollfd, int fd )
{epoll_event event;event.data.fd fd;event.events EPOLLIN | EPOLLET;epoll_ctl( epollfd, EPOLL_CTL_ADD, fd, event );setnonblocking( fd );
}void sig_handler( int sig )
{int save_errno errno;int msg sig;send( sig_pipefd[1], ( char* )msg, 1, 0 );errno save_errno;
}void addsig( int sig, void(*handler)(int), bool restart true )
{struct sigaction sa;memset( sa, \0, sizeof( sa ) );sa.sa_handler handler;if( restart ){sa.sa_flags | SA_RESTART;}sigfillset( sa.sa_mask );assert( sigaction( sig, sa, NULL ) ! -1 );
}void del_resource()
{close( sig_pipefd[0] );close( sig_pipefd[1] );close( listenfd );close( epollfd );shm_unlink( shm_name );delete [] users;delete [] sub_process;
}//停止一个子进程
void child_term_handler( int sig )
{stop_child true;
}//子进程运行函数
//idx子进程处理客户连接的编号
//users保存所有客户连接数据的数组
//share_mem住处共享内存地址
int run_child( int idx, client_data* users, char* share_mem )
{//子进程用 epoll 监听客户连接 和 与父进程通信的管道文件描述符epoll_event events[ MAX_EVENT_NUMBER ];int child_epollfd epoll_create( 5 );assert( child_epollfd ! -1 );int connfd users[idx].connfd;addfd( child_epollfd, connfd );int pipefd users[idx].pipefd[1];addfd( child_epollfd, pipefd );int ret;//子进程设置自己的信号处理函数addsig( SIGTERM, child_term_handler, false );while( !stop_child ){int number epoll_wait( child_epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ( number 0 ) ( errno ! EINTR ) ){printf( epoll failure\n );break;}for ( int i 0; i number; i ){int sockfd events[i].data.fd;//客户连接有数据到达if( ( sockfd connfd ) ( events[i].events EPOLLIN ) ){memset( share_mem idx*BUFFER_SIZE, \0, BUFFER_SIZE );//读取到对应的读缓存中读缓存是共享内存的一段它开始于 idx*BUFFER_SIZE 处长度为 BUFFER_SIZE 字节。故每个客户连接的读缓存是共享的。ret recv( connfd, share_mem idx*BUFFER_SIZE, BUFFER_SIZE-1, 0 );if( ret 0 ){if( errno ! EAGAIN ){stop_child true;}}else if( ret 0 ){stop_child true;}else{//成功读取客户数据后通知主进程通过管道来处理send( pipefd, ( char* )idx, sizeof( idx ), 0 );}}//主进程通知本进程通过管道将第client个客户额数据发送到本进程负责的客户端else if( ( sockfd pipefd ) ( events[i].events EPOLLIN ) ){int client 0;//接收主进程发送来的数据即有客户数据到达的连接编号ret recv( sockfd, ( char* )client, sizeof( client ), 0 );if( ret 0 ){if( errno ! EAGAIN ){stop_child true;}}else if( ret 0 ){stop_child true;}else{send( connfd, share_mem client * BUFFER_SIZE, BUFFER_SIZE, 0 );}}else{continue;}}}close( connfd );close( pipefd );close( child_epollfd );return 0;
}int main( int argc, char* argv[] )
{if( argc 2 ){printf( usage: %s ip_address port_number\n, basename( argv[0] ) );return 1;}const char* ip argv[1];int port atoi( argv[2] );int ret 0;struct sockaddr_in address;bzero( address, sizeof( address ) );address.sin_family AF_INET;inet_pton( AF_INET, ip, address.sin_addr );address.sin_port htons( port );listenfd socket( PF_INET, SOCK_STREAM, 0 );assert( listenfd 0 );ret bind( listenfd, ( struct sockaddr* )address, sizeof( address ) );assert( ret ! -1 );ret listen( listenfd, 5 );assert( ret ! -1 );user_count 0;users new client_data [ USER_LIMIT1 ];sub_process new int [ PROCESS_LIMIT ];for( int i 0; i PROCESS_LIMIT; i ){sub_process[i] -1;}epoll_event events[ MAX_EVENT_NUMBER ];epollfd epoll_create( 5 );assert( epollfd ! -1 );addfd( epollfd, listenfd );ret socketpair( PF_UNIX, SOCK_STREAM, 0, sig_pipefd );assert( ret ! -1 );setnonblocking( sig_pipefd[1] );addfd( epollfd, sig_pipefd[0] );addsig( SIGCHLD, sig_handler );addsig( SIGTERM, sig_handler );addsig( SIGINT, sig_handler );addsig( SIGPIPE, SIG_IGN );bool stop_server false;bool terminate false;//创建共享内存作为所有客户 socket 连接的读缓存shmfd shm_open( shm_name, O_CREAT | O_RDWR, 0666 );assert( shmfd ! -1 );ret ftruncate( shmfd, USER_LIMIT * BUFFER_SIZE ); assert( ret ! -1 );share_mem (char*)mmap( NULL, USER_LIMIT * BUFFER_SIZE, PROT_READ | PROT_WRITE, MAP_SHARED, shmfd, 0 );assert( share_mem ! MAP_FAILED );close( shmfd );while( !stop_server ){int number epoll_wait( epollfd, events, MAX_EVENT_NUMBER, -1 );if ( ( number 0 ) ( errno ! EINTR ) ){printf( epoll failure\n );break;}for ( int i 0; i number; i ){int sockfd events[i].data.fd;if( sockfd listenfd ) //新连接到来{struct sockaddr_in client_address;socklen_t client_addrlength sizeof( client_address );int connfd accept( listenfd, ( struct sockaddr* )client_address, client_addrlength );if ( connfd 0 ){printf( errno is: %d\n, errno );continue;}if( user_count USER_LIMIT ){const char* info too many users\n;printf( %s, info );send( connfd, info, strlen( info ), 0 );close( connfd );continue;}//保存新连接相关数据users[user_count].address client_address;users[user_count].connfd connfd;//创建与子进程管道ret socketpair( PF_UNIX, SOCK_STREAM, 0, users[user_count].pipefd );assert( ret ! -1 );pid_t pid fork();if( pid 0 ){close( connfd );continue;}else if( pid 0 ){close( epollfd );close( listenfd );close( users[user_count].pipefd[0] );close( sig_pipefd[0] );close( sig_pipefd[1] );run_child( user_count, users, share_mem );munmap( (void*)share_mem, USER_LIMIT * BUFFER_SIZE );exit( 0 );}else{close( connfd );close( users[user_count].pipefd[1] );addfd( epollfd, users[user_count].pipefd[0] );users[user_count].pid pid;//记录新客户连接在数组 users中的索引值建立进程pid 和该索引值间的映射sub_process[pid] user_count;user_count;}}//处理信号else if( ( sockfd sig_pipefd[0] ) ( events[i].events EPOLLIN ) ){int sig;char signals[1024];ret recv( sig_pipefd[0], signals, sizeof( signals ), 0 );if( ret -1 ){continue;}else if( ret 0 ){continue;}else{for( int i 0; i ret; i ){switch( signals[i] ){case SIGCHLD: //子进程退出表示某个客户端关闭了连接{pid_t pid;int stat;while ( ( pid waitpid( -1, stat, WNOHANG ) ) 0 ){//子进程pid取得被关闭连接编号int del_user sub_process[pid];sub_process[pid] -1;if( ( del_user 0 ) || ( del_user USER_LIMIT ) ){printf( the deleted user was not change\n );continue;}//清除关闭连接的相关数据epoll_ctl( epollfd, EPOLL_CTL_DEL, users[del_user].pipefd[0], 0 );close( users[del_user].pipefd[0] );users[del_user] users[--user_count];sub_process[users[del_user].pid] del_user;printf( child %d exit, now we have %d users\n, del_user, user_count ); }if( terminate user_count 0 ){stop_server true;}break;}case SIGTERM:case SIGINT:{ //结束服务器程序printf( kill all the clild now\n );//addsig( SIGTERM, SIG_IGN );//addsig( SIGINT, SIG_IGN );if( user_count 0 ){stop_server true;break;}for( int i 0; i user_count; i ){int pid users[i].pid;kill( pid, SIGTERM );}terminate true;break;}default:{break;}}}}}else if( events[i].events EPOLLIN ) //某个子进程向父进程写了数据{int child 0;ret recv( sockfd, ( char* )child, sizeof( child ), 0 ); //读取管道数据child 变量记录哪个客户连接有数据到达printf( read data from child accross pipe\n );if( ret -1 ){continue;}else if( ret 0 ){continue;}else{ //向除负责处理第 chold 个客户连接的子进程外的 其他子进程发送消息通知它们有客户要写数据for( int j 0; j user_count; j ){if( users[j].pipefd[0] ! sockfd ){printf( send data to child accross pipe\n );send( users[j].pipefd[0], ( char* )child, sizeof( child ), 0 );}}}}}}del_resource();return 0;
}3. 消息队列
消息队列在两个进程间传递 二进制块数据 的方式。
每个数据块都有一个类型接收方可以根据类型来有选择的接收数据。 而不像管道 和 命名管道那样必须先进先出的接收数据。
Linux消息队列 API 定义在 sys/msg.h 头文件中
包括 4 个系统调用
msgget、msgsnd、msgrcv、msgctl。3.1 msgget 系统调用
//创建一个消息队列或者获取一个消息队列
#include sys/msg.h
int msgget (key_t key, int msgflg);
key:表用来标识一个全局唯一的消息队列。
msgflg使用和含义与 semget 的参数 sem_flags参数相同。指定一组标志。它低端 9 个比特是信号量的权限其格式和含义都与系统调用的 open、model参数相同。此外它还可以和 IPC_CREAT 标志做按位 “或”运算以创建新的信号量集。确保创建一组新的、唯一的信号量集联合使用 IPC_CREAT 和 IPC_EXCL.此时若创建的信号量集已存在则 semget 返回错误 并设置 errno 为 EEXIST.返回成功返回正整数值是消息队列的标识符失败-1并设置 errno扩展若msgget用于创建消息队列则与之关联的内核数据结构 msqid_ds 将被创建并初始化struct msqid_ds{struct ipc_perm msg_perm; //消息队列的操作权限time_t msg_stime; //最后一次调用 msgsnd 的时间time_t msg_rtime; //最后一次调用 msgrcv 的时间time_t msg_ctime; //最后一次被修改的时间unsigned long __msg_cbytes; //消息队列中已有的字节数msqqnum_t msg_qnum; //消息队列中已有的消息数msglen_t msg_qbytes; //消息队列允许最大字节数pid_t msg_lspid; //最后执行 msgsnd 的进程 PIDpid_t msg_lrpid; //最后执行 msgrcv 的进程 PID} 3.2 msgsnd 系统调用
//把一条消息添加到消息队列
#include sys/msg.h
int msgsnd(int msqid, const void * msg_ptr, size_t msg_sz, int msgflg);
msqid消息队列标识符。即 msgget 调用返回值。
msg_ptr指向一个准备发送的消息必须被定义为如下类型struct msgbuf{long mtype; //消息类型必须是一个正整数。char mtext[512]; //消息数据}
msg_sz消息的数据部分mtext的长度。可以为 0表示没有消息数据。
msgflg:控制 msgsnd 的行为。支持 IPC_NOWAIT 标志即以非阻塞方式发送消息。此时 msgsnd 将立即返回并设置 errno 为 EAGAIN.默认情况下发送消息时若消息队列满了则 msgsnd 将阻塞。处于阻塞状态的 msgsnd 调用可能被如下两种情况所中断1.消息队列被移除。此时 msgsnd 将立即返回并设置 errno 为 EIDRM。2.程序接收到信号。此时 msgsnd 将立即返回并设置 errno 为 EINTR。返回成功0.成功将修改内核数据 msqid_ds 的部分字段如下msg_qnum 加1msg_lspid 设置为调用进程的 PIDmsg_stime 设置为当前时间失败-1errno
3.3 msgrcv 系统调用
//从消息队列中获取消息
#include sys/msg.h
int msgrcv(int msqid, void * msg_ptr,size_t msg_sz, long int msgtype, int msgflg);
msqid:消息队列标识符
msg_ptr用于存储接收的消息
msg_sz消息数据部分长度。
msgtype指定消息类型。msgtype 0: 读取消息队列中第一个消息。msgtype 0: 读取消息队列中第一个类型为 msgtype 的消息除非指定了标志 MSG_EXCEPTmsgtype 0: 读取消息队列中第一个类型值比 msgtype 的绝对值小的消息。
msgflg控制 msgrcv 的行为可以是如下标志的按位或IPC_NOWAIT: 如果消息队列中没有消息则 msgrcv 立即返回并设置 errno 为 ENOMSG.MSG_EXCEPT: 若 msgtype 0 则接收消息队列中第一个非 msgtype 类型的消息。MSG_NOERROR如果消息数据部分长度超过 msg_sz 就将它截断。返回成功0.成功将修改 内核数据结构 msqid_ds 中部分字段msg_qnum 减 1.msg_lrqid 设置为调用进程 pid。msg_rtime 设置为当前时间。失败 -1errno
扩展处于阻塞状态的 msgrcv 可能被如下两种异常中断1.消息队列被移除。 此时 msgrcv 将立即返回并设置 errno 为 EIDRM.2.程序接收到信号。 此时 msgrcv 将立即返回并设置 errno 为 EINTR.
3.4 msgctl 系统调用
//控制消息队列某些属性
#include sys/msg.h
int msgctl(int msqid, int command, struct msqid_ds * buf);
msqid共享内存标识符。
command指定要执行的命令。详见下表 13-4。
返回成功取决于 command 参数详见下表 13-4。失败-1errno。 4. IPC 命令 5.进程间传递文件描述符
fork 后父进程中打开的文件描述符在子进程中仍然保持打开。
传递文件描述符是要在接收进程中创建一个新的文件描述符并且该文件描述符和发送进程中被传递的文件描述符指向内核中相同文件表项。
示例
#include sys/socket.h
#include fcntl.h
#include stdio.h
#include unistd.h
#include stdlib.h
#include assert.h
#include string.hstatic const int CONTROL_LEN CMSG_LEN( sizeof(int) );/// brief 发送文件描述符
/// param fd 传递信息的 unix 域 socket
/// param fd_to_send 待发送额文件描述符
void send_fd( int fd, int fd_to_send )
{struct iovec iov[1];struct msghdr msg; char buf[0];iov[0].iov_base buf;iov[0].iov_len 1;msg.msg_name NULL; //消息的协议地址 协议地址和套接口信息//在非连接的UDP中发送者要指定对方地址端口接受方用于的到数据来源如果不需要的话可以设置为NULL//在TCP或者连接的UDP中一般设置为NULLmsg.msg_namelen 0; ///* 地址的长度 */msg.msg_iov iov; /* 多io缓冲区的地址 */ msg.msg_iovlen 1; /* 缓冲区的个数 */ cmsghdr cm;cm.cmsg_len CONTROL_LEN; /* 包含该头部的数据长度 */ cm.cmsg_level SOL_SOCKET; /* 具体的协议标识 */ cm.cmsg_type SCM_RIGHTS; /* 协议中的类型 */ *(int *)CMSG_DATA( cm ) fd_to_send;msg.msg_control cm; //设置辅助数据msg.msg_controllen CONTROL_LEN; /* 辅助数据的长度 */ sendmsg( fd, msg, 0 );//只用于套接口,不能用于普通的I/O读写参数sockfd则是指明要读写的套接口
}/// brief 接收文件描述符
/// param fd
/// return
int recv_fd( int fd )
{struct iovec iov[1];struct msghdr msg;char buf[0];iov[0].iov_base buf; //指定用户空间缓存区地址iov[0].iov_len 1; //指定 缓冲区长度为 1msg.msg_name NULL;msg.msg_namelen 0;msg.msg_iov iov;msg.msg_iovlen 1;cmsghdr cm;msg.msg_control cm;msg.msg_controllen CONTROL_LEN;recvmsg( fd, msg, 0 );int fd_to_read *(int *)CMSG_DATA( cm );return fd_to_read;
}int main()
{int pipefd[2];int fd_to_pass 0;int ret socketpair( PF_UNIX, SOCK_DGRAM, 0, pipefd ); //创建进程通信文件描述符assert( ret ! -1 );pid_t pid fork();assert( pid 0 );if ( pid 0 ){close( pipefd[0] );//子进程关闭一端fd_to_pass open( test.txt, O_RDWR, 0666 ); //子进程打开文件描述符send_fd( pipefd[1], ( fd_to_pass 0 ) ? fd_to_pass : 0 );//子进程向本地socket一端写数据并携带待传递的文件描述符close( fd_to_pass );//子进程关闭打开的文件描述符exit( 0 );}close( pipefd[1] );//主进程关闭写端因为子进程往这个方向写fd_to_pass recv_fd( pipefd[0] ); //主进程接收文件描述符char buf[1024];memset( buf, \0, 1024 );read( fd_to_pass, buf, 1024 ); //主进程读取接收的文件描述符中的数据printf( I got fd %d and data %s\n, fd_to_pass, buf );close( fd_to_pass ); //主进程关闭 接收的文件描述符。
}