加入收藏 | 设为首页 | 会员中心 | 我要投稿 云计算网_汕头站长网 (https://www.0754zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

Linux 线程池

发布时间:2023-01-31 11:28:15 所属栏目:Linux 来源:
导读:  什么是线程池,为什么要使用线程池?

  什么是线程池?

  首先顾名思义,就是把一堆开辟好的线程放在一个池子统一管理,就是一个线程池。

  为什么要使用线程池?

  难道来一个请求给它申
  什么是线程池,为什么要使用线程池?
 
  什么是线程池?
 
  首先顾名思义,就是把一堆开辟好的线程放在一个池子统一管理,就是一个线程池。
 
  为什么要使用线程池?
 
  难道来一个请求给它申请一个线程,请求处理完了释放线程不行吗?也行,但是如果创建线程和销毁线程的时间比线程处理请求时间长,而且请求很多的情况下,我们的 CPU 资源都浪费在创建线程和销毁线程上了线程池linux,所以这种方法效率会比较低。于是,我们可以将若干已经创建好的线程放在一起统一管理,如果来了一个请求,我们就从线程池中取出一个线程来处理请求,处理完了放回池内等待下一个任务,线程池的好处是避免了繁琐的创建和结束线程的时间,有效的利用了 CPU 资源。
 
  还有一个很重要的作用就是异步解耦。我们在做开发的时候,日志也是很重要的。对于日志的保存,一般都是保存到磁盘上,要知道磁盘的读写要比内存慢很多倍。如果日志直接写入磁盘的话,会发现整个服务器吞吐量的性能被压在磁盘的读写上面。如何解决性能不被压在磁盘上面,我们引入线程池。线程池起到了一个异步解耦的作用。那什么是异步解耦?就是写磁盘和落盘这个两个动作不在一个流程里面。
 
  线程池的模型
 
  按照我的理解,线程池的作用和双缓冲的作用类似,可以完成任务处理的 ”鱼贯动作“。
 
  最后,如何才能创建一个线程池的模型呢?一般需要以下三个参与者:
 
  线程池结构,它负责管理多个线程并提供任务队列的接口工作线程,它们负责处理任务任务队列,存放待处理的任务
 
  有了三个参与者,下一个问题就是怎么使线程池安全有序的工作,可以使用 POSIX 中的信号量、互斥锁和条件变量等同步手段。下面描述了一个客户端发起请求时,服务器端是如何从线程池中取出一个线程进行处理的。
 
  #include
  #include
  #include
  #include
  #include
  // 向队列中插入一个节点,采用头插法
  #define LL_ADD(item, list) do{ \
   item->prev = NULL; \
   item->next = list; \
   if(list) \
   list->prev = item; \
   list = item; \
  }while(0);
  //从队列中移除一个节点
  #define LL_REMOVE(item, list) do{ \
   if(item->prev) \
   item->prev->next = item->next; \
   if(item->next) \
   item->next->prev = item->prev; \
   if(list == item) \
   list = item->next; \
   item->next = item->prev = NULL;
  }while(0);
  // 工作线程
  struct NWORKER {
   pthread_t thread;
   struct threadpool_t *pool;
   int terminate;
  
   struct NWORKER *prev;
   struct NWORKER *next;
  };
  // 任务队列
  struct NJOB{
   void (*func)(struct NJOB *job);
   void *user_data;
   struct NJOB *prev;
   struct NJOB *next;
  };
  // 管理组件:管理工作线程和任务队列
  struct threadpool_t{
   struct NWORKER *workers;
   struct NJOB *jobs;
   pthread_cond_t jobs_cond; // 任务到来的环境变量
   pthread_mutex_t jobs_mutex; // 对任务队列加锁
  };
  // static 只在本文件有效
  static void *ThreadCallback(void *args)
  {
   struct NWORKER *worker = (struct NWORKER *)args;
   // 每个线程两种状态:执行/等待
   while(1)
   {
   // 为什么要加锁?
   // 因为进入pthread_cond_wait函数,首先会解锁,函数返回会进行加锁
   pthread_mutex_lock(&worker->pool->jobs_mutex);
   while(worker->pool->jobs == NULL) // 没有任务
   {
   if(worker->terminate)
   break;
   pthread_cond_wait(&worker->pool->jobs_cond, &worker->pool->jobs_mutex);
   }
   if(worker->terminate)
   {
   pthread_mutex_unlock(&worker->pool->jobs_mutex);
   break;
   }
   struct NJOB *job = worker->pool->jobs;
   LL_REMOVE(job, worker->pool->jobs); // 删除头节点
   pthread_mutex_unlock(&worker->pool->jobs_mutex);
   job->func(job->user_data);
   }
   free(worker);
   pthread_exit(NULL);
  }
  // 线程池的初始化
  int ThreadPoolCreate(struct threadpool_t *pool, int workers)
  {
   if(workers < 1)
   workers = 1;
   if(!pool)
   return -1;
   memset(pool, 0, sizeof(struct threadpool_t));
   pthread_cond_t blank_cond = PTHREAD_COND_INITIALIZER;
   memcpy(pool->jobs_cond, &blank_cond, sizeof(pthread_cond_t));
   pthread_mutex_t blank_mutex = PTHREAD_MUTEX_INITIALIZER;
   memcpy(pool->jobs_mutex, &blank_mutex, sizeof(pthread_mutex_t));
   int i = 0;
   for(; i < workers; i++)
   {
   struct NWORKER *worker = (struct NWORKER *)malloc(sizeof( struct NWORKER));
   if(!worker)
   assert(worker != NULL);
   memset(worker, 0, sizeof(worker));
   worker->pool = pool;
   int ret = pthread_create(worker->thread, NULL, ThreadCallback, (void *)worker);
   assert(ret == 0);
   LL_ADD(worker, pool->workers);
   }
   return 0;
  }
  // 往线程池中添加一个任务
  void ThreadPoolPush(struct threadpool_t *pool, struct NJOB *job)
  {
   // 加锁
   pthread_mutex_lock(&pool->jobs_mutex);
  
   // 将任务加入到任务列表中
   LL_ADD(job, pool->jobs);
   // 唤醒线程池中一个线程
   pthread_cond_signal(&pool->jobs_cond);
   // 解锁
   pthread_mutex_unlock(&pool->jobs_mutex);
  }
  // 销毁线程池
  int ThreadPoolDestroy(struct threadpool_t *pool)
  {
   if(!pool)
   return -1;
   struct NWORKER *worker = NULL;
   for(worker = pool->workers; worker != NULL; worker = worker->next)
   {
   worker->terminate = 1;
   }
  
   // 加锁
   pthread_mutex_lock(&pool->jobs_mutex);
   // 唤醒所有线程
   pthread_cond_broadcast(&pool->jobs_cond);
   // 解锁
   pthread_mutex_unlock(&pool->jobs_mutex);
  }
 

(编辑:云计算网_汕头站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!