可伸缩多线程任务队列(转载)

出自:http://blog.csdn.net/tianmohust/article/details/9335243

在我们的工作中,我们经常需要异步执行一些任务,下面介绍的这个可伸缩多线程队列,可满足我们的需求。

  出自:http://www.codeproject.com/Articles/4148/Multithreaded-Job-Queue,主要有以下几个功能:

    1、任务队列是多线程,许多任务可以异步进行,任务队列使用线程池来执行任务。

    2、任务队列支持优先级,优先级高的任务优先执行(即使是后来添加的)

    3、任务队列可以被暂停,但是用户还是可以添加任务,当任务队列被唤醒时,任务可以继续执行下去

    4、在运行过程中,任务队列使用的线程池,用户可以自行增加和减少

  大体框架主要由3个类构成

    1、CJob,任务类,用户需要从该类派生来实现自身需要完成的任务

    2、CJobExecuter,任务执行类,任务均由该类来调用执行,每一个类相当于对应一个线程

    3、CMThreadedJobQ,多线程任务队列,添加任务已经任务的分发均由该类完成,该类维护一个任务队列和一个完成队列的线程池。

  类图如下:

  该例子中,CJobExecuter和CMThreadJobQ这两个类的调用关系是非常值得我们学习的,同时,CJob作为一个基类,子类派生可以实现不同的任务,可扩展性也不错。源代码解析如下:

Job.h文件:

 1 class CJob
 2 {
 3 public:
 4     CJob();
 5     virtual ~CJob();
 6     BOOL m_Completed;         //任务是否完成:TRUE 完成,FALSE 未完成
 7     static long lastUsedID;   //最后的ID
 8     //================================================================================================
 9     //函数名:                  setPriority
10     //函数描述:                设置任务优先级
11     //输入:                    [in] priority 优先级别
12     //输出:                    无
13     //返回:                    无
14     //================================================================================================
15     void setPriority(int priority);
16     //================================================================================================
17     //函数名:                  getPriority
18     //函数描述:                返回任务优先级
19     //输入:                    无
20     //输出:                    无
21     //返回:                    任务优先级
22     //================================================================================================
23     int getPriority();
24     //================================================================================================
25     //函数名:                  getID
26     //函数描述:                返回任务ID
27     //输入:                    无
28     //输出:                    无
29     //返回:                    任务ID
30     //================================================================================================
31     long getID();
32     //================================================================================================
33     //函数名:                  setAutoDelete
34     //函数描述:                设置完成任务后是否删除任务
35     //输入:                    [in] autoDeleteFlag
36     //输出:                    无
37     //返回:                    无
38     //================================================================================================
39     void setAutoDelete(BOOL autoDeleteFlag = TRUE);
40     //================================================================================================
41     //函数名:                  AutoDelete
42     //函数描述:                返回删除任务标记
43     //输入:                    无
44     //输出:                    无
45     //返回:                    任务标记
46     //================================================================================================
47     BOOL AutoDelete();
48     //================================================================================================
49     //函数名:                  execute
50     //函数描述:                任务真正工作的函数,纯虚函数,需要子类化实现
51     //输入:                    无
52     //输出:                    无
53     //返回:                    任务ID
54     //================================================================================================
55     virtual void execute() = 0;
56 private:
57     long m_ID;               //任务ID
58     BOOL m_autoDeleteFlag;   //是否自动删除任务标记,TRUE 删除,FALSE 不删除,默认为TRUE
59     int m_priority;          //任务优先级,默认为5
60 };

Job.cpp文件:

 1 long CJob::lastUsedID = 0;
 2
 3 CJob::CJob()
 4 {
 5     this->m_ID = InterlockedIncrement(&lastUsedID);
 6     this->m_autoDeleteFlag = TRUE;
 7     this->m_priority = 5;
 8     this->m_Completed= FALSE;
 9 }
10 CJob::~CJob()
11 {
12 }
13 BOOL CJob::AutoDelete()
14 {
15     return m_autoDeleteFlag;
16 }
17 void CJob::setAutoDelete(BOOL autoDeleteFlag)
18 {
19     m_autoDeleteFlag = autoDeleteFlag;
20 }
21 long CJob::getID()
22 {
23     return this->m_ID;
24 }
25 int CJob::getPriority()
26 {
27     return this->m_priority;
28 }
29 void CJob::setPriority(int priority)
30 {
31     this->m_priority = priority;
32 }

JobExecuter.h文件:

 1 //一个对象对应一个线程,执行任务Job
 2 class CJobExecuter
 3 {
 4 public:
 5     CJobExecuter(CMThreadedJobQ *pJobQ);
 6     virtual ~CJobExecuter();
 7     //================================================================================================
 8     //函数名:                  stop
 9     //函数描述:                停止执行任务
10     //输入:                    无
11     //输出:                    无
12     //返回:                    无
13     //================================================================================================
14     void stop();
15     //================================================================================================
16     //函数名:                  execute
17     //函数描述:                执行一个任务
18     //输入:                    [in] pJob 任务指针
19     //输出:                    无
20     //返回:                    无
21     //================================================================================================
22     void execute(CJob* pJob);
23     static UINT ThreadFunction(LPVOID pParam); //线程函数
24     CMThreadedJobQ* m_pJobQ;                   //指向线程任务队列指针
25     CJob* m_pJob2Do;                           //指向正在执行任务的指针
26     int m_flag;                                //线程执行标记
27     CWinThread* m_pExecuterThread;             //线程标识符
28 };

JobExecuter.cpp文件:

 1 #define STOP_WORKING -1
 2 #define KEEP_WORKING  0
 3 CJobExecuter::CJobExecuter(CMThreadedJobQ *pJobQ)
 4 {
 5     this->m_pJobQ= pJobQ;
 6     this->m_pExecuterThread= AfxBeginThread(ThreadFunction,this);
 7     this->m_pJob2Do = NULL;
 8     this->m_flag = KEEP_WORKING;
 9 }
10 CJobExecuter::~CJobExecuter()
11 {
12     if(this->m_pExecuterThread!= NULL )
13     {
14         this->m_pExecuterThread->ExitInstance();
15         delete m_pExecuterThread;
16     }
17 }
18 UINT CJobExecuter::ThreadFunction(LPVOID pParam)
19 {
20     CJobExecuter *pExecuter = (CJobExecuter *)pParam;
21     pExecuter->m_flag = 1;
22     ::Sleep(1);
23     CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);
24     while(pExecuter->m_flag !=STOP_WORKING )
25     {
26         if(pExecuter->m_pJob2Do!=  NULL)
27         {
28             pExecuter->m_pJob2Do->execute();
29             pExecuter->m_pJob2Do->m_Completed = TRUE;
30             if(pExecuter->m_pJob2Do->AutoDelete())
31                 delete pExecuter->m_pJob2Do;
32             pExecuter->m_pJob2Do = NULL;
33         }
34
35         if(pExecuter->m_pJobQ == NULL) break;
36         CSingleLock singleLock(&pExecuter->m_pJobQ->m_cs);
37         singleLock.Lock();
38         if(pExecuter->m_pJobQ->getNoOfExecuter() > pExecuter->m_pJobQ->getMaxNoOfExecuter()) //CJobExecuter个数大于最大值,自动销毁
39         {
40             pExecuter->stop();
41             singleLock.Unlock();
42         }
43         else
44         {
45             pExecuter->m_pJobQ->addFreeJobExecuter(pExecuter);      //完成任务后,添加到CMThreadedJobQ的空闲队列中
46             singleLock.Unlock();
47             pExecuter->m_pJobQ->m_pObserverThread->ResumeThread();
48             pExecuter->m_pExecuterThread->SuspendThread();
49         }
50     }
51     if(pExecuter->m_pJobQ != NULL)
52     {
53         pExecuter->m_pJobQ->deleteJobExecuter(pExecuter);
54     }
55     else
56     {
57         delete pExecuter;
58     }
59     return 0;
60 }
61 void CJobExecuter::execute(CJob* pJob)
62 {
63     this->m_pJob2Do = pJob;
64     ::Sleep(0);
65     this->m_pExecuterThread->ResumeThread();
66 }
67 void CJobExecuter::stop()
68 {
69     this->m_flag = STOP_WORKING;
70     this->m_pExecuterThread->ResumeThread();
71 }

MThreadedJobQ.h文件

  1 typedef CTypedPtrList< CPtrList ,CJob*>CJobQList;
  2 //线程池任务队列
  3 class CMThreadedJobQ
  4 {
  5 public:
  6     typedef struct THNODE
  7     {
  8         CJobExecuter* pExecuter;
  9         THNODE * pNext ;
 10     } THNODE;
 11
 12     CMThreadedJobQ();
 13     virtual ~CMThreadedJobQ();
 14     //================================================================================================
 15     //函数名:                  deleteJobExecuter
 16     //函数描述:                删除一个JobExecuter对象
 17     //输入:                    [in] pEx
 18     //输出:                    无
 19     //返回:                    无
 20     //================================================================================================
 21     void deleteJobExecuter(CJobExecuter *pEx);
 22     //================================================================================================
 23     //函数名:                  setMaxNoOfExecuter
 24     //函数描述:                设置CJobExecuter的个数
 25     //输入:                    [in] value
 26     //输出:                    无
 27     //返回:                    无
 28     //================================================================================================
 29     void setMaxNoOfExecuter(int value);
 30     //================================================================================================
 31     //函数名:                  addJobExecuter
 32     //函数描述:                添加一个CJobExecuter
 33     //输入:                    [in] pEx
 34     //输出:                    无
 35     //返回:                    无
 36     //================================================================================================
 37     void addJobExecuter(CJobExecuter *pEx);
 38     //================================================================================================
 39     //函数名:                  getJobExecuter
 40     //函数描述:                返回一个CJobExecuter
 41     //输入:                    无
 42     //输出:                    无
 43     //返回:                    处理任务的指针
 44     //================================================================================================
 45     CJobExecuter* getJobExecuter();
 46     //================================================================================================
 47     //函数名:                  addFreeJobExecuter
 48     //函数描述:                添加一个CJobExecuter
 49     //输入:                    [in] pEx
 50     //输出:                    无
 51     //返回:                    无
 52     //================================================================================================
 53     void addFreeJobExecuter(CJobExecuter *pEx);
 54     //================================================================================================
 55     //函数名:                  addJob
 56     //函数描述:                添加一个任务
 57     //输入:                    [in] pJob
 58     //输出:                    无
 59     //返回:                    无
 60     //================================================================================================
 61     void addJob(CJob *pJob);
 62     //================================================================================================
 63     //函数名:                  getMaxNoOfExecuter
 64     //函数描述:                获取CJobExecuter个数的最大值
 65     //输入:                    无
 66     //输出:                    无
 67     //返回:                    无
 68     //================================================================================================
 69     int getMaxNoOfExecuter();
 70     //================================================================================================
 71     //函数名:                  getNoOfExecuter
 72     //函数描述:                获取当前CJobExecuter的个数
 73     //输入:                    无
 74     //输出:                    无
 75     //返回:                    无
 76     //================================================================================================
 77     int getNoOfExecuter();
 78     static UINT JobObserverThreadFunction(LPVOID);
 79     //================================================================================================
 80     //函数名:                  pause
 81     //函数描述:                挂起JobObserverThread线程
 82     //输入:                    无
 83     //输出:                    无
 84     //返回:                    无
 85     //================================================================================================
 86     void pause();
 87     //================================================================================================
 88     //函数名:                  resume
 89     //函数描述:                唤醒JobObserverThread线程
 90     //输入:                    无
 91     //输出:                    无
 92     //返回:                    无
 93     //================================================================================================
 94     void resume();
 95     CWinThread* m_pObserverThread; //向空闲的executer线程添加任务的线程
 96     CCriticalSection m_cs;         //关键代码段,用于互斥
 97     CJobQList m_jobQList;          //任务队列
 98 private :
 99     BOOL m_pause;                  //JobObserverThread线程运行标记
100     int m_MaxNoOfExecuter;         //CJobExecuter最大个数
101     int m_NoOfExecuter;            //当前CJobExecuter个数
102     THNODE* m_pFreeEList;          //维护空闲处理任务线程的队列
103     THNODE* m_pAllEList;           //维护所有处理任务线程的队列
104 };

MThreadedJobQ.cpp文件:

  1 CMThreadedJobQ::CMThreadedJobQ()
  2 {
  3     m_MaxNoOfExecuter = 2;
  4     m_pause = FALSE;
  5     m_pObserverThread = AfxBeginThread(JobObserverThreadFunction,this);
  6     m_pFreeEList =NULL;
  7     m_NoOfExecuter =0;
  8     m_pAllEList = NULL;
  9 }
 10 CMThreadedJobQ::~CMThreadedJobQ()
 11 {
 12     THNODE* pTempNode;
 13     while (m_pAllEList != NULL)
 14     {
 15         pTempNode = m_pAllEList->pNext;
 16         delete m_pAllEList->pExecuter;
 17         delete m_pAllEList;
 18         m_pAllEList = pTempNode;
 19     }
 20     while (m_pFreeEList != NULL)
 21     {    pTempNode = m_pFreeEList->pNext;
 22         delete m_pFreeEList;
 23         m_pFreeEList = pTempNode;
 24     }
 25
 26     m_pObserverThread->ExitInstance();
 27     delete m_pObserverThread;
 28 }
 29 void CMThreadedJobQ::pause()
 30 {
 31     this->m_pause = TRUE;
 32 }
 33 void CMThreadedJobQ::resume()
 34 {
 35     this->m_pause = FALSE;
 36     this->m_pObserverThread->ResumeThread();
 37 }
 38 UINT CMThreadedJobQ::JobObserverThreadFunction(LPVOID pParam)
 39 {
 40     CMThreadedJobQ *pMTJQ = (CMThreadedJobQ *)pParam;
 41     CJobExecuter *pJExecuter;
 42     while(TRUE)
 43     {
 44         Sleep(100);
 45         if(pMTJQ->m_pause != TRUE)
 46         {
 47             while(!pMTJQ->m_jobQList.IsEmpty() )
 48             {
 49                 pJExecuter = pMTJQ->getJobExecuter();
 50                 if( pJExecuter!=NULL)
 51                 {
 52                     pMTJQ->m_cs.Lock();
 53                     pJExecuter->execute(pMTJQ->m_jobQList.GetHead());
 54                     pMTJQ->m_jobQList.RemoveHead();
 55                     AfxGetApp()->m_pMainWnd->PostMessage(REFRESH_LIST);
 56                     pMTJQ->m_cs.Unlock();
 57                 }
 58                 else
 59                 {
 60                     break;
 61                 }
 62                 if(pMTJQ->m_pause == TRUE)
 63                     break;
 64             }
 65         }
 66         pMTJQ->m_pObserverThread->SuspendThread();
 67     }
 68     return 0;
 69 }
 70 int CMThreadedJobQ::getNoOfExecuter()
 71 {
 72     return this->m_NoOfExecuter;
 73 }
 74
 75 int CMThreadedJobQ::getMaxNoOfExecuter()
 76 {
 77     return this->m_MaxNoOfExecuter;
 78 }
 79 void CMThreadedJobQ::addJob(CJob *pJob)
 80 {
 81     CJob * pTempJob;
 82     CSingleLock sLock(&this->m_cs);
 83     sLock.Lock();
 84     POSITION pos,lastPos;
 85     pos = this->m_jobQList.GetHeadPosition();
 86     lastPos = pos;
 87     if(pos != NULL)
 88         pTempJob =this->m_jobQList.GetHead();
 89     while(pos != NULL )
 90     {
 91         if( pJob->getPriority() > pTempJob->getPriority())
 92             break;
 93         lastPos = pos;
 94         pTempJob =     this->m_jobQList.GetNext(pos);
 95     }
 96     if(pos == NULL)
 97         this->m_jobQList.AddTail(pJob);
 98     else
 99         this->m_jobQList.InsertBefore(lastPos,pJob);
100     this->m_pObserverThread->ResumeThread();
101     sLock.Unlock();
102 }
103 void CMThreadedJobQ::addFreeJobExecuter(CJobExecuter *pEx)
104 {
105     m_cs.Lock();
106     THNODE* node = new THNODE;
107     node->pExecuter = pEx;
108     node->pNext = this->m_pFreeEList;
109     this->m_pFreeEList = node;
110     m_cs.Unlock();
111 }
112 CJobExecuter* CMThreadedJobQ::getJobExecuter()
113 {
114     THNODE *pTemp;
115     CJobExecuter *pEx=NULL;
116     m_cs.Lock();
117     if(this->m_pFreeEList != NULL)  //有空闲CJobExecuter,就返回
118     {
119         pTemp = this->m_pFreeEList;
120         this->m_pFreeEList = this->m_pFreeEList->pNext;
121         pEx = pTemp->pExecuter;
122         delete pTemp ;
123         m_cs.Unlock();
124         return pEx;
125     }
126     if(this->m_NoOfExecuter < this->m_MaxNoOfExecuter) //没有空闲CJobExecuter,并且当前CJobExecuter小于最大值,就生成一个新的CJobExecuter
127     {
128         pEx =  new CJobExecuter(this);
129         this->addJobExecuter(pEx);
130         this->m_NoOfExecuter++;
131         m_cs.Unlock();
132         return pEx;
133     }
134     m_cs.Unlock();
135     return NULL;
136 }
137 void CMThreadedJobQ::addJobExecuter(CJobExecuter *pEx)
138 {
139     m_cs.Lock();
140     THNODE* node = new THNODE;
141     node->pExecuter= pEx;
142     node->pNext = this->m_pAllEList;
143     this->m_pAllEList = node;
144     m_cs.Unlock();
145 }
146 void CMThreadedJobQ::setMaxNoOfExecuter(int value)
147 {
148     this->m_cs.Lock();
149     if(value >1 && value <11)
150         this->m_MaxNoOfExecuter = value;
151     m_pObserverThread->ResumeThread();
152     this->m_cs.Unlock();
153 }
154 void CMThreadedJobQ::deleteJobExecuter(CJobExecuter *pEx)
155 {
156     THNODE* pNode,*pNodeP;
157     CSingleLock singleLock(&m_cs);
158     singleLock.Lock();
159     if(this->m_pAllEList != NULL)
160     {
161         pNode = this->m_pAllEList;
162         if(pNode->pExecuter == pEx )
163         {
164           this->m_pAllEList = pNode->pNext;
165           delete pNode;
166         }
167         else
168         {
169             pNodeP =pNode;
170             pNode  = pNode->pNext ;
171             while(pNode != NULL )
172             {
173                 if(pNode->pExecuter== pEx ) break;
174                 pNodeP = pNode;
175                 pNode  = pNode->pNext ;
176             }
177             if(pNode!= NULL)
178             {
179                 pNodeP->pNext = pNode->pNext;
180                 delete pNode;
181             }
182         }
183     }
184     this->m_NoOfExecuter--;
185     singleLock.Unlock();
186     pEx->stop();
187     Sleep(1);
188     delete pEx;
189 }

可伸缩多线程任务队列(转载),布布扣,bubuko.com

时间: 07-18

可伸缩多线程任务队列(转载)的相关文章

VC++6.0 下配置 pthread库2010年12月12日 星期日 13:14VC下的pthread多线程编程 转载

VC++6.0 下配置 pthread库2010年12月12日 星期日 13:14VC下的pthread多线程编程     转载 #include <stdio.h>#include <stdlib.h>#include <pthread.h> void* tprocess1(void* args){       int i=1;       while(i<=10){            printf("process1:%d\n",i);

基于事件的 NIO 多线程服务器--转载

JDK1.4 的 NIO 有效解决了原有流式 IO 存在的线程开销的问题,在 NIO 中使用多线程,主要目的已不是为了应对每个客户端请求而分配独立的服务线程,而是通过多线程充分使用用多个 CPU 的处理能力和处理中的等待时间,达到提高服务能力的目的. 多线程的引入,容易为本来就略显复杂的 NIO 代码进一步降低可读性和可维护性.引入良好的设计模型,将不仅带来高性能.高可靠的代码,也将带来一个惬意的开发过程. 线程模型 NIO 的选择器采用了多路复用(Multiplexing)技术,可在一个选择器

基于多线程任务队列执行时间测试——泛型单例模式落地

目录 基于多线程任务队列执行时间测试--泛型单例模式落地 1.需求 2.遇到的问题 3.解决思路 4.具体代码 4.1泛型单例 4.2 开始时间实体 4.3 实例化单例 4.4 获取任务结束时间 5.小结 5.1 本文提供了单例模式实际应用中的一次落地: 5.2 单例模式适用于全局不变的实例: 5.3 泛型实现单例,适用于不同的数据类型实例: 5.4 单例可以跨线程,内存级别共享. 基于多线程任务队列执行时间测试--泛型单例模式落地 1.需求 比如有100个任务,多线程异步执行,统计执行完所有任

java创建多线程(转载)

转载自:Java创建线程的两个方法 Java提供了线程类Thread来创建多线程的程序.其实,创建线程与创建普通的类的对象的操作是一样的,而线程就是Thread类或其子类的实例对象.每个Thread对象描述了一个单独的线程.要产生一个线程,有两种方法: ◆需要从Java.lang.Thread类派生一个新的线程类,重载它的run()方法: ◆实现Runnalbe接口,重载Runnalbe接口中的run()方法. 为什么Java要提供两种方法来创建线程呢?它们都有哪些区别?相比而言,哪一种方法更好

Java多线程总结(转载)

林炳文Evankaka原创作品.转载请注明出处http://blog.csdn.net/evankaka 写在前面的话:此文只能说是Java多线程的一个入门,其实Java里头线程完全可以写一本书了,但是如果最基本的你都学掌握好,又怎么能更上一个台阶呢?如果你觉得此文很简单,那推荐你看看Java并发包的的线程池(Java并发编程与技术内幕:线程池深入理解),或者看这个专栏:Java并发编程与技术内幕.你将会对Java里头的高并发场景下的线程有更加深刻的理解. 目录(?)[-] 一扩展javalan

JAVA多线程 问题 转载

1.Java 中多线程同步是什么? 在多线程程序下,同步能实现控制对共享资源的访问.如果没有同步,当一个 Java 线程在修改一个共享变量时,另外一个线程正在使用或者更新同一个变量,这样容易导致程序出现错误的结果. 2.解释实现多线程的几种方法? 1>  Java 线程可以实现 Runnable 接口:(当你打算多重继承时,优先选择实现 Runnable) [java] class Foo implements Runnable { public void run(){ //... } } Th

详解iOS多线程 (转载)

iPhone 中的线程应用并不是无节制的,官方给出的资料显示iPhone OS下的主线程的堆栈大小是1M,第二个线程开始都是512KB.并且该值不能通过编译器开关或线程API函数来更改. 只有主线程有直接修改UI的能力. 一. NSOperation和NSOperationQueue 1.一个继承自 NSOperation的操作类,该类的实现中必须有 - (void)main方法的. 2.使用NSOperation的最简单方法就是将其放入NSOperationQueue中. 一旦一个操作被加入队

[转载]Python 资源大全

原文链接:Python 资源大全 环境管理 管理 Python 版本和环境的工具 p – 非常简单的交互式 python 版本管理工具. pyenv – 简单的 Python 版本管理工具. Vex – 可以在虚拟环境中执行命令. virtualenv – 创建独立 Python 环境的工具. virtualenvwrapper– virtualenv 的一组扩展. 包管理 管理包和依赖的工具. pip – Python 包和依赖关系管理工具. pip-tools – 保证 Python 包依赖

Python 库大全

作者:Lingfeng Ai链接:http://www.zhihu.com/question/24590883/answer/92420471来源:知乎著作权归作者所有.商业转载请联系作者获得授权,非商业转载请注明出处. Awesome Python中文版来啦! 本文由 伯乐在线 - 艾凌风 翻译,Namco 校稿.未经许可,禁止转载!英文出处:github.com.欢迎加入翻译组. 原文链接:Python 资源大全 1200+收藏,600+赞,别只顾着自己私藏呀朋友们 ------------