加入收藏 | 设为首页 | 会员中心 | 我要投稿 云计算网_汕头站长网 (https://www.0754zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 服务器 > 搭建环境 > Linux > 正文

线程池(Linux实现)

发布时间:2022-10-25 11:22:39 所属栏目:Linux 来源:
导读:  本文技术参考了sourceforge项目c thread pool,链接:

  线程池如上一篇随笔()提到的内存池一样,也是一种池化策略,在启动时(或者更高级的,运行时按一定策略分配)预先开启N个线程,当没有工作要做时,
  本文技术参考了sourceforge项目c thread pool,链接:
 
  线程池如上一篇随笔()提到的内存池一样,也是一种池化策略,在启动时(或者更高级的,运行时按一定策略分配)预先开启N个线程,当没有工作要做时,这些线程处于睡眠中;一旦有工作加入工作队列,其中的某些线程就会醒来,处理这些工作线程池linux,完成后继续睡眠 。
 
  要实现线程池(只针对本文的简单实现而言),应设计和构建3样东西:
 
  含N个线程的线程组工作队列工作线程例程
 
  线程组和工作队列表示如下:
 
  /*
   *     Threads:
   *
   *     +----------+----------+------+------------+
   *     | thread 0 | thread 1 | .... | thread n-1 |
   *     +----------+----------+------+------------+
   *
   *     Job Queue:
   *
   *        back                            front
   *         |                                |
   *         v                                v
   *     +-------+    +-------+           +-------+
   *     | job 0 | -> | job 1 | -> ... -> | job x |
   *     +-------+    +-------+           +-------+
   *
   */
  线程组可以用普通数组或者动态分配的数组实现,维数就是池中线程数量,存放的其实是线程ID。工作队列可以直接用C++ queue容器实现。
 
  工作线程例程(线程函数)的大致执行流程如下图所示:
 
  /*
   *
   *     Each Thread Routine:
   *                                 Job-Queue
   *              |                    ...
   *              v                     |
   *          +-------+            +---------+   EnQueue
   *    +---> | sleep |  (No job)  | new job | <--------- Client
   *    |     +-------+            +---------+
   *    |         |                     |
   *    |         |     DeQueue    +---------+
   *    |         +  <-----------  | new job |
   *    |         |                +---------+
   *    |         v
   *    |    +---------+
   *    |    | do work |
   *    |    +---------+
   *    |         |
   *    |         |
   *    +----<----+
   *
   */
  工作队列中没有工作时它就睡眠 ,有工作时苏醒,从队列首部取出(&删除)一个工作,然后开始执行。
 
  另外,我们还需要一个互斥锁L和一个计数信号量S,互斥锁用来同步工作队列的增删操作,计数信号量用来对工作队列中的工作数量进行记录。工作线程会一直等待S,直到它大于0。
 
  下面给出完整代码。
 
  1. threadpool.h
 
   1 /*
   2  * Linux线程池的简单实现.
   3  * Author: 赵子清
   4  * Blog: http://www.cnblogs.com/zzqcn
   5  *
   6  **/
   7
   8
   9
  10 #ifndef __THREADPOOL_H__
  11 #define __THREADPOOL_H__
  12
  13
  14 #include
  15 #include
  16 #include
  17
  18
  19
  20 #define  DLPTP_MAX_THREADS    1024
  21
  22
  23 struct tp_job_t
  24 {
  25     void        (*work) (void*);
  26     void*        arg;
  27 };
  28
  29 struct tp_threadpool_t
  30 {
  31     pthread_t*            threads;
  32     size_t                nthreads;
  33     std::queue    jobs;
  34     sem_t                njobs;
  35     pthread_mutex_t        lock;
  36     bool                running;
  37 };
  38
  39
  40 tp_threadpool_t*  tp_init(size_t _nthreads);
  41 int     tp_deinit(tp_threadpool_t* _ptp);
  42 void*   tp_worker(void* _ptp);
  43 int     tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void* _arg);
  44
  45
  46 #endif
  47  
  2. threadpool.cpp
 
    1 /*
    2  * Linux线程池的简单实现.
    3  * Author: 赵子清
    4  * Blog: http://www.cnblogs.com/zzqcn
    5  *
    6  **/
    7
    8
    9
   10 #include "threadpool.h"
   11
   12
   13
   14 tp_threadpool_t*  tp_init(size_t _nthreads)
   15 {
   16     if(_nthreads < 1 || _nthreads > DLPTP_MAX_THREADS)
   17         return  NULL;
   18
   19     int  err = 0;
   20     tp_threadpool_t*  ret = NULL;
   21     size_t  i, j;
   22
   23     ret = new tp_threadpool_t;
   24     if(NULL == ret)
   25         return  NULL;
   26     ret->nthreads = _nthreads;
   27     ret->threads = new pthread_t[_nthreads];
   28     if(NULL == ret->threads)
   29     {
   30         delete ret;
   31         return  NULL;
   32     }
   33     ret->running = true;
   34
   35     err = sem_init(&ret->njobs, 0, 0);
   36     if(-1 == err)
   37     {
   38         delete[] ret->threads;
   39         delete ret;
   40         return  NULL;
   41     }
   42
   43     err = pthread_mutex_init(&ret->lock, NULL);
   44     if(err)
   45     {
   46         sem_destroy(&ret->njobs);
   47         delete[] ret->threads;
   48         delete ret;
   49         return  NULL;
   50     }
   51
   52     for(i=0; i<_nthreads; ++i)
   53     {
   54         err = pthread_create(&ret->threads[i], NULL, tp_worker, (void*)ret);
   55         if(err)
   56         {
   57             ret->running = false;
   58             for(j=0; jj)
   59             {
   60                 pthread_cancel(ret->threads[j]);
   61                 pthread_join(ret->threads[j], NULL);
   62             }
   63             pthread_mutex_destroy(&ret->lock);
   64             sem_destroy(&ret->njobs);
   65             delete[] ret->threads;
   66             delete ret;
   67             return  NULL;
   68         }
   69     }
   70
   71     return ret;
   72 }
   73
   74
   75 int  tp_deinit(tp_threadpool_t* _ptp)
   76 {
   77     if(NULL == _ptp)
   78         return  -1;
   79
   80     int  err = 0;
   81     size_t  i, j;
   82
   83     // TODO: if now worker has job to handle, do something then exit
   84     while(!_ptp->jobs.empty());
   85
   86     _ptp->running = false;
   87
   88     for(i=0; i<_ptp->nthreads; ++i)
   89     {
   90         err = sem_post(&_ptp->njobs);              /* V, ++ */
   91         if(err)
   92         {
   93             for(j=i; j<_ptp->nthreads; ++j)
   94                 pthread_cancel(_ptp->threads[j]);
   95             break;
   96         }
   97     }
   98
   99     for(i=0; i<_ptp->nthreads; ++i)
  100         pthread_join(_ptp->threads[i], NULL);
  101
  102     pthread_mutex_destroy(&_ptp->lock);
  103     sem_destroy(&_ptp->njobs);
  104
  105     delete[] _ptp->threads; _ptp->threads = NULL;
  106     delete _ptp;            _ptp = NULL;
  107
  108     return  0;
  109 }
  110
  111
  112 void*  tp_worker(void* _ptp)
  113 {
  114     if(NULL == _ptp)
  115         return  NULL;
  116
  117     tp_threadpool_t* p = (tp_threadpool_t*)_ptp;
  118
  119     while(p->running)
  120     {
  121         sem_wait(&p->njobs);                /* P, -- */
  122
  123         if(!p->running)
  124             return  NULL;
  125
  126         void   (*work) (void*);
  127         void*  arg;
  128         tp_job_t  job;
  129
  130         pthread_mutex_lock(&p->lock);       /* LOCK */
  131
  132         job = p->jobs.front();
  133         work = job.work;
  134         arg = job.arg;
  135         p->jobs.pop();
  136
  137         pthread_mutex_unlock(&p->lock);     /* UNLOCK */
  138
  139         work(arg);
  140     }
  141
  142     return  NULL;
  143 }
  144
  145
  146 int  tp_add_job(tp_threadpool_t* _ptp, void (*_work)(void*), void* _arg)
  147 {
  148     if(NULL == _ptp || NULL == _work)
  149         return  -1;
  150
  151     tp_job_t  job;
  152     job.work = _work;
  153     job.arg = _arg;
  154
  155     pthread_mutex_lock(&_ptp->lock);        /* LOCK */
  156     _ptp->jobs.push(job);
  157     sem_post(&_ptp->njobs);                 /* V, ++ */
  158     pthread_mutex_unlock(&_ptp->lock);      /* UNLOCK */
  159
  160     return  0;
  161 }
  3. 测试程序main.cpp
 
   1 /*
   2  * Linux线程池测试.
   3  * Author: 赵子清
   4  * Blog: http://www.cnblogs.com/zzqcn
   5  *
   6  **/
   7
   8 #include
   9 #include
  10 #include "threadpool.h"
  11
  12
  13 /* task 1 */
  14 void task1(void* _arg)
  15 {
  16     printf("# Thread working: %u\n", (int)pthread_self());
  17     printf("  Task 1 running..\n");
  18     usleep(5000);
  19 }
  20
  21
  22 /* task 2 */
  23 void task2(void* _arg)
  24 {
  25     printf("# Thread working: %u\n", (int)pthread_self());
  26     printf("  Task 2 running..  ");
  27     printf("%d\n", *((int*)_arg));
  28     usleep(5000);
  29 }
  30
  31
  32 #define  N_THREADS  4
  33
  34 int  main(int argc, char** argv)
  35 {
  36     tp_threadpool_t*  ptp = NULL;
  37     int  i;
  38     
  39     ptp = tp_init(N_THREADS);
  40     if(NULL == ptp)
  41     {
  42         fprintf(stderr, "tp_init fail\n");
  43         return -1;
  44     }
  45
  46     int  a = 32;
  47     for(i=0; i<10; ++i)
  48     {
  49         tp_add_job(ptp, task1, NULL);
  50         tp_add_job(ptp, task2, (void*)&a);
  51     }
  52
  53     tp_deinit(ptp);
  54
  55     return  0;
  56 }
 

(编辑:云计算网_汕头站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!