Linux 线程池
发布时间:2023-01-31 11:28:15 所属栏目:Linux 来源:
导读: 什么是线程池,为什么要使用线程池?
什么是线程池?
首先顾名思义,就是把一堆开辟好的线程放在一个池子统一管理,就是一个线程池。
为什么要使用线程池?
难道来一个请求给它申
什么是线程池?
首先顾名思义,就是把一堆开辟好的线程放在一个池子统一管理,就是一个线程池。
为什么要使用线程池?
难道来一个请求给它申
|
什么是线程池,为什么要使用线程池? 什么是线程池? 首先顾名思义,就是把一堆开辟好的线程放在一个池子统一管理,就是一个线程池。 为什么要使用线程池? 难道来一个请求给它申请一个线程,请求处理完了释放线程不行吗?也行,但是如果创建线程和销毁线程的时间比线程处理请求时间长,而且请求很多的情况下,我们的 CPU 资源都浪费在创建线程和销毁线程上了线程池linux,所以这种方法效率会比较低。于是,我们可以将若干已经创建好的线程放在一起统一管理,如果来了一个请求,我们就从线程池中取出一个线程来处理请求,处理完了放回池内等待下一个任务,线程池的好处是避免了繁琐的创建和结束线程的时间,有效的利用了 CPU 资源。 还有一个很重要的作用就是异步解耦。我们在做开发的时候,日志也是很重要的。对于日志的保存,一般都是保存到磁盘上,要知道磁盘的读写要比内存慢很多倍。如果日志直接写入磁盘的话,会发现整个服务器吞吐量的性能被压在磁盘的读写上面。如何解决性能不被压在磁盘上面,我们引入线程池。线程池起到了一个异步解耦的作用。那什么是异步解耦?就是写磁盘和落盘这个两个动作不在一个流程里面。 线程池的模型 按照我的理解,线程池的作用和双缓冲的作用类似,可以完成任务处理的 ”鱼贯动作“。 最后,如何才能创建一个线程池的模型呢?一般需要以下三个参与者: 线程池结构,它负责管理多个线程并提供任务队列的接口工作线程,它们负责处理任务任务队列,存放待处理的任务 有了三个参与者,下一个问题就是怎么使线程池安全有序的工作,可以使用 POSIX 中的信号量、互斥锁和条件变量等同步手段。下面描述了一个客户端发起请求时,服务器端是如何从线程池中取出一个线程进行处理的。 #include #include #include #include #include // 向队列中插入一个节点,采用头插法 #define LL_ADD(item, list) do{ \ item->prev = NULL; \ item->next = list; \ if(list) \ list->prev = item; \ list = item; \ }while(0); //从队列中移除一个节点 #define LL_REMOVE(item, list) do{ \ if(item->prev) \ item->prev->next = item->next; \ if(item->next) \ item->next->prev = item->prev; \ if(list == item) \ list = item->next; \ item->next = item->prev = NULL; }while(0); // 工作线程 struct NWORKER { pthread_t thread; struct threadpool_t *pool; int terminate; struct NWORKER *prev; struct NWORKER *next; }; // 任务队列 struct NJOB{ void (*func)(struct NJOB *job); void *user_data; struct NJOB *prev; struct NJOB *next; }; // 管理组件:管理工作线程和任务队列 struct threadpool_t{ struct NWORKER *workers; struct NJOB *jobs; pthread_cond_t jobs_cond; // 任务到来的环境变量 pthread_mutex_t jobs_mutex; // 对任务队列加锁 }; // static 只在本文件有效 static void *ThreadCallback(void *args) { struct NWORKER *worker = (struct NWORKER *)args; // 每个线程两种状态:执行/等待 while(1) { // 为什么要加锁? // 因为进入pthread_cond_wait函数,首先会解锁,函数返回会进行加锁 pthread_mutex_lock(&worker->pool->jobs_mutex); while(worker->pool->jobs == NULL) // 没有任务 { if(worker->terminate) break; pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mutex); } if(worker->terminate) { pthread_mutex_unlock(&worker->pool->jobs_mutex); break; } struct NJOB *job = worker->pool->jobs; LL_REMOVE(job, worker->pool->jobs); // 删除头节点 pthread_mutex_unlock(&worker->pool->jobs_mutex); job->func(job->user_data); } free(worker); pthread_exit(NULL); } // 线程池的初始化 int ThreadPoolCreate(struct threadpool_t *pool, int workers) { if(workers < 1) workers = 1; if(!pool) return -1; memset(pool, 0, sizeof(struct threadpool_t)); pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER; memcpy(pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t)); pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER; memcpy(pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t)); int i = 0; for(; i < workers; i++) { struct NWORKER *worker = (struct NWORKER *)malloc(sizeof( struct NWORKER)); if(!worker) assert(worker != NULL); memset(worker, 0, sizeof(worker)); worker->pool = pool; int ret = pthread_create(worker->thread, NULL, ThreadCallback, (void *)worker); assert(ret == 0); LL_ADD(worker, pool->workers); } return 0; } // 往线程池中添加一个任务 void ThreadPoolPush(struct threadpool_t *pool, struct NJOB *job) { // 加锁 pthread_mutex_lock(&pool->jobs_mutex); // 将任务加入到任务列表中 LL_ADD(job, pool->jobs); // 唤醒线程池中一个线程 pthread_cond_signal(&pool->jobs_cond); // 解锁 pthread_mutex_unlock(&pool->jobs_mutex); } // 销毁线程池 int ThreadPoolDestroy(struct threadpool_t *pool) { if(!pool) return -1; struct NWORKER *worker = NULL; for(worker = pool->workers; worker != NULL; worker = worker->next) { worker->terminate = 1; } // 加锁 pthread_mutex_lock(&pool->jobs_mutex); // 唤醒所有线程 pthread_cond_broadcast(&pool->jobs_cond); // 解锁 pthread_mutex_unlock(&pool->jobs_mutex); } (编辑:云计算网_汕头站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |
站长推荐


浙公网安备 33038102330478号