Welcome 微信登录
编程资源 图片资源库 蚂蚁家优选 PDF转换器

首页 / 操作系统 / Linux / C语言实现简单线程池

有时我们会需要大量线程来处理一些相互独立的任务,为了避免频繁的申请释放线程所带来的开销,我们可以使用线程池。下面是一个C语言实现的简单的线程池。头文件: 1: #ifndef THREAD_POOL_H__<!--CRLF--> 2: #define THREAD_POOL_H__<!--CRLF--> 3:<!--CRLF--> 4: #include <pthread.h><!--CRLF--> 5:<!--CRLF--> 6: /* 要执行的任务链表 */<!--CRLF--> 7: typedef struct tpool_work {<!--CRLF--> 8: void* (*routine)(void*); /* 任务函数 */<!--CRLF--> 9: void*arg;/* 传入任务函数的参数 */<!--CRLF-->10: struct tpool_work *next;<!--CRLF-->11: }tpool_work_t;<!--CRLF-->12:<!--CRLF-->13: typedef struct tpool {<!--CRLF-->14: int shutdown;/* 线程池是否销毁 */<!--CRLF-->15: int max_thr_num;/* 最大线程数 */<!--CRLF-->16: pthread_t *thr_id;/* 线程ID数组 */<!--CRLF-->17: tpool_work_t*queue_head;/* 线程链表 */<!--CRLF-->18: pthread_mutex_t queue_lock;<!--CRLF-->19: pthread_cond_tqueue_ready;<!--CRLF-->20: }tpool_t;<!--CRLF-->21:<!--CRLF-->22: /*<!--CRLF-->23:* @brief 创建线程池 <!--CRLF-->24:* @param max_thr_num 最大线程数<!--CRLF-->25:* @return 0: 成功 其他: 失败<!--CRLF-->26:*/<!--CRLF-->27: int<!--CRLF-->28: tpool_create(int max_thr_num);<!--CRLF-->29:<!--CRLF-->30: /*<!--CRLF-->31:* @brief 销毁线程池 <!--CRLF-->32:*/<!--CRLF-->33: void<!--CRLF-->34: tpool_destroy();<!--CRLF-->35:<!--CRLF-->36: /*<!--CRLF-->37:* @brief 向线程池中添加任务<!--CRLF-->38:* @paramroutine 任务函数指针<!--CRLF-->39:* @param arg 任务函数参数<!--CRLF-->40:* @return 0: 成功 其他:失败 <!--CRLF-->41:*/<!--CRLF-->42: int<!--CRLF-->43: tpool_add_work(void*(*routine)(void*), void *arg);<!--CRLF-->44:<!--CRLF-->45: #endif<!--CRLF-->实现: 1: #include <unistd.h><!--CRLF--> 2: #include <stdlib.h><!--CRLF--> 3: #include <errno.h><!--CRLF--> 4: #include <string.h><!--CRLF--> 5: #include <stdio.h><!--CRLF--> 6:<!--CRLF--> 7: #include "tpool.h"<!--CRLF--> 8:<!--CRLF--> 9: static tpool_t *tpool = NULL;<!--CRLF-->10:<!--CRLF-->11: /* 工作者线程函数, 从任务链表中取出任务并执行 */<!--CRLF-->12: static void* <!--CRLF-->13: thread_routine(void *arg)<!--CRLF-->14: {<!--CRLF-->15: tpool_work_t *work;<!--CRLF-->16: <!--CRLF-->17: while(1) {<!--CRLF-->18: /* 如果线程池没有被销毁且没有任务要执行,则等待 */<!--CRLF-->19: pthread_mutex_lock(&tpool->queue_lock);<!--CRLF-->20: while(!tpool->queue_head && !tpool->shutdown) {<!--CRLF-->21: pthread_cond_wait(&tpool->queue_ready, &tpool->queue_lock);<!--CRLF-->22: }<!--CRLF-->23: if (tpool->shutdown) {<!--CRLF-->24: pthread_mutex_unlock(&tpool->queue_lock);<!--CRLF-->25: pthread_exit(NULL);<!--CRLF-->26: }<!--CRLF-->27: work = tpool->queue_head;<!--CRLF-->28: tpool->queue_head = tpool->queue_head->next;<!--CRLF-->29: pthread_mutex_unlock(&tpool->queue_lock);<!--CRLF-->30:<!--CRLF-->31: work->routine(work->arg);<!--CRLF-->32: free(work);<!--CRLF-->33: }<!--CRLF-->34: <!--CRLF-->35: return NULL; <!--CRLF-->36: }<!--CRLF-->37:<!--CRLF-->38: /*<!--CRLF-->39:* 创建线程池 <!--CRLF-->40:*/<!--CRLF-->41: int<!--CRLF-->42: tpool_create(int max_thr_num)<!--CRLF-->43: {<!--CRLF-->44: int i;<!--CRLF-->45:<!--CRLF-->46: tpool = calloc(1, sizeof(tpool_t));<!--CRLF-->47: if (!tpool) {<!--CRLF-->48: printf("%s: calloc failed ", __FUNCTION__);<!--CRLF-->49: exit(1);<!--CRLF-->50: }<!--CRLF-->51: <!--CRLF-->52: /* 初始化 */<!--CRLF-->53: tpool->max_thr_num = max_thr_num;<!--CRLF-->54: tpool->shutdown = 0;<!--CRLF-->55: tpool->queue_head = NULL;<!--CRLF-->56: if (pthread_mutex_init(&tpool->queue_lock, NULL) !=0) {<!--CRLF-->57: printf("%s: pthread_mutex_init failed, errno:%d, error:%s ",<!--CRLF-->58: __FUNCTION__, errno, strerror(errno));<!--CRLF-->59: exit(1);<!--CRLF-->60: }<!--CRLF-->61: if (pthread_cond_init(&tpool->queue_ready, NULL) !=0 ) {<!--CRLF-->62: printf("%s: pthread_cond_init failed, errno:%d, error:%s ", <!--CRLF-->63: __FUNCTION__, errno, strerror(errno));<!--CRLF-->64: exit(1);<!--CRLF-->65: }<!--CRLF-->66: <!--CRLF-->67: /* 创建工作者线程 */<!--CRLF-->68: tpool->thr_id = calloc(max_thr_num, sizeof(pthread_t));<!--CRLF-->69: if (!tpool->thr_id) {<!--CRLF-->70: printf("%s: calloc failed ", __FUNCTION__);<!--CRLF-->71: exit(1);<!--CRLF-->72: }<!--CRLF-->73: for (i = 0; i < max_thr_num; ++i) {<!--CRLF-->74: if (pthread_create(&tpool->thr_id[i], NULL, thread_routine, NULL) != 0){<!--CRLF-->75: printf("%s:pthread_create failed, errno:%d, error:%s ", __FUNCTION__, <!--CRLF-->76: errno, strerror(errno));<!--CRLF-->77: exit(1);<!--CRLF-->78: }<!--CRLF-->79: <!--CRLF-->80: }<!--CRLF-->81:<!--CRLF-->82: return 0;<!--CRLF-->83: }<!--CRLF-->84:<!--CRLF-->85: /* 销毁线程池 */<!--CRLF-->86: void<!--CRLF-->87: tpool_destroy()<!--CRLF-->88: {<!--CRLF-->89: int i;<!--CRLF-->90: tpool_work_t *member;<!--CRLF-->91:<!--CRLF-->92: if (tpool->shutdown) {<!--CRLF-->93: return;<!--CRLF-->94: }<!--CRLF-->95: tpool->shutdown = 1;<!--CRLF-->96:<!--CRLF-->97: /* 通知所有正在等待的线程 */<!--CRLF-->98: pthread_mutex_lock(&tpool->queue_lock);<!--CRLF-->99: pthread_cond_broadcast(&tpool->queue_ready);<!--CRLF--> 100: pthread_mutex_unlock(&tpool->queue_lock);<!--CRLF--> 101: for (i = 0; i < tpool->max_thr_num; ++i) {<!--CRLF--> 102: pthread_join(tpool->thr_id[i], NULL);<!--CRLF--> 103: }<!--CRLF--> 104: free(tpool->thr_id);<!--CRLF--> 105:<!--CRLF--> 106: while(tpool->queue_head) {<!--CRLF--> 107: member = tpool->queue_head;<!--CRLF--> 108: tpool->queue_head = tpool->queue_head->next;<!--CRLF--> 109: free(member);<!--CRLF--> 110: }<!--CRLF--> 111:<!--CRLF--> 112: pthread_mutex_destroy(&tpool->queue_lock);<!--CRLF--> 113: pthread_cond_destroy(&tpool->queue_ready);<!--CRLF--> 114:<!--CRLF--> 115: free(tpool);<!--CRLF--> 116: }<!--CRLF--> 117:<!--CRLF--> 118: /* 向线程池添加任务 */<!--CRLF--> 119: int<!--CRLF--> 120: tpool_add_work(void*(*routine)(void*), void *arg)<!--CRLF--> 121: {<!--CRLF--> 122: tpool_work_t *work, *member;<!--CRLF--> 123: <!--CRLF--> 124: if (!routine){<!--CRLF--> 125: printf("%s:Invalid argument ", __FUNCTION__);<!--CRLF--> 126: return -1;<!--CRLF--> 127: }<!--CRLF--> 128: <!--CRLF--> 129: work = malloc(sizeof(tpool_work_t));<!--CRLF--> 130: if (!work) {<!--CRLF--> 131: printf("%s:malloc failed ", __FUNCTION__);<!--CRLF--> 132: return -1;<!--CRLF--> 133: }<!--CRLF--> 134: work->routine = routine;<!--CRLF--> 135: work->arg = arg;<!--CRLF--> 136: work->next = NULL;<!--CRLF--> 137:<!--CRLF--> 138: pthread_mutex_lock(&tpool->queue_lock);<!--CRLF--> 139: member = tpool->queue_head;<!--CRLF--> 140: if (!member) {<!--CRLF--> 141: tpool->queue_head = work;<!--CRLF--> 142: } else {<!--CRLF--> 143: while(member->next) {<!--CRLF--> 144: member = member->next;<!--CRLF--> 145: }<!--CRLF--> 146: member->next = work;<!--CRLF--> 147: }<!--CRLF--> 148: /* 通知工作者线程,有新任务添加 */<!--CRLF--> 149: pthread_cond_signal(&tpool->queue_ready);<!--CRLF--> 150: pthread_mutex_unlock(&tpool->queue_lock);<!--CRLF--> 151:<!--CRLF--> 152: return 0;<!--CRLF--> 153: }<!--CRLF--> 154: <!--CRLF--> 155:<!--CRLF-->测试代码: 1: #include <unistd.h><!--CRLF--> 2: #include <stdio.h><!--CRLF--> 3: #include <stdlib.h><!--CRLF--> 4: #include "tpool.h"<!--CRLF--> 5:<!--CRLF--> 6: void *func(void *arg)<!--CRLF--> 7: {<!--CRLF--> 8: printf("thread %d ", (int)arg);<!--CRLF--> 9: return NULL;<!--CRLF-->10: }<!--CRLF-->11:<!--CRLF-->12: int<!--CRLF-->13: main(int arg, char **argv)<!--CRLF-->14: {<!--CRLF-->15: if (tpool_create(5) != 0) {<!--CRLF-->16: printf("tpool_create failed ");<!--CRLF-->17: exit(1);<!--CRLF-->18: }<!--CRLF-->19: <!--CRLF-->20: int i;<!--CRLF-->21: for (i = 0; i < 10; ++i) {<!--CRLF-->22: tpool_add_work(func, (void*)i);<!--CRLF-->23: }<!--CRLF-->24: sleep(2);<!--CRLF-->25: tpool_destroy();<!--CRLF-->26: return 0;<!--CRLF-->27: }<!--CRLF-->这个实现是在调用tpool_destroy之后,仅将当前正在执行的任务完成之后就会退出,我们也可以修改代码使得线程池在执行完任务链表中所有任务后再退出。