|
CGraph
|
线程池,包含主线程池、辅助线程池、普通任务队列、优先级任务队列、线程池配置、监控线程等 More...
#include <UThreadPool.h>


Public Member Functions | |
| UThreadPool (CBool autoInit=true, const UThreadPoolConfig &config=UThreadPoolConfig()) noexcept | |
| ~UThreadPool () override | |
| CStatus | setConfig (const UThreadPoolConfig &config) |
| UThreadPoolConfig | getConfig () const |
| CStatus | init () final |
| template<typename FunctionType > | |
| auto | commit (const FunctionType &task, CIndex index=CGRAPH_DEFAULT_TASK_STRATEGY) -> std::future< decltype(std::declval< FunctionType >()())> |
| template<typename FunctionType > | |
| auto | commitWithTid (const FunctionType &task, CIndex tid, CBool enable, CBool lockable) -> std::future< decltype(std::declval< FunctionType >()())> |
| template<typename FunctionType > | |
| auto | commitWithPriority (const FunctionType &task, int priority) -> std::future< decltype(std::declval< FunctionType >()())> |
| template<typename FunctionType > | |
| CVoid | execute (const FunctionType &task, CIndex index=CGRAPH_DEFAULT_TASK_STRATEGY) |
| CStatus | submit (const UTaskGroup &taskGroup, CMSec ttl=CGRAPH_MAX_BLOCK_TTL) |
| 遍历入参taskGroup的task_arr_,通过commit接口提交到对应的线程池或者任务队列; 将commit返回的std::future对象存入futures数组; 根据taskGtoup的TTL和本线程池配置的TTL计算futures等待的timeout (deadline), 在deadline到达之前遍历等到所有的std::future对象返回,并根据返回值统计status; 最后执行taskGroup自带的on_finished_接口 More... | |
| CStatus | submit (CGRAPH_DEFAULT_CONST_FUNCTION_REF func, CMSec ttl=CGRAPH_MAX_BLOCK_TTL, CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished=nullptr) |
| CIndex | getThreadIndex (CSize tid) |
| CStatus | destroy () final |
| CBool | isInit () const |
| CStatus | createSecondaryThread (CInt size) |
| CStatus | releaseSecondaryThread (CInt size) |
| template<typename FunctionType > | |
| CGRAPH_NAMESPACE_BEGIN auto | commit (const FunctionType &task, CIndex index) -> std::future< decltype(std::declval< FunctionType >()())> |
Public Member Functions inherited from CObject | |
| CObject ()=default | |
| virtual | ~CObject ()=default |
Protected Member Functions | |
| virtual CIndex | dispatch (CIndex origIndex) |
| CVoid | monitor () |
Protected Member Functions inherited from UThreadObject | |
| CStatus | run () override |
Protected Member Functions inherited from UtilsObject | |
| CStatus | run () override |
Private Attributes | |
| CBool | is_init_ { false } |
| CInt | cur_index_ = 0 |
| UAtomicQueue< UTask > | task_queue_ |
| UAtomicPriorityQueue< UTask > | priority_task_queue_ |
| std::vector< UThreadPrimaryPtr > | primary_threads_ |
| std::list< std::unique_ptr< UThreadSecondary > > | secondary_threads_ |
| UThreadPoolConfig | config_ |
| std::thread | monitor_thread_ |
| std::map< CSize, int > | thread_record_map_ |
| std::mutex | st_mutex_ |
线程池,包含主线程池、辅助线程池、普通任务队列、优先级任务队列、线程池配置、监控线程等
|
explicitnoexcept |
通过默认设置参数,来创建线程池
| autoInit | 是否自动开启线程池功能 |
| config |
|
override |
析构函数

| CGRAPH_NAMESPACE_BEGIN auto UThreadPool::commit | ( | const FunctionType & | task, |
| CIndex | index | ||
| ) | -> std::future<decltype(std::declval<FunctionType>()())> |
如果是长时间任务,则交给特定的任务队列,仅由辅助线程处理 目的是防止有很多长时间任务,将所有运行的线程均阻塞 长任务程序,默认优先级较低
| auto UThreadPool::commit | ( | const FunctionType & | task, |
| CIndex | index = CGRAPH_DEFAULT_TASK_STRATEGY |
||
| ) | -> std::future< decltype(std::declval< FunctionType >()())> |
提交任务信息并分发到对应的任务队列(普通or优先)和线程池(主or辅) 入参index可以指定线程id,但是实际上仍然需要通过dispatch来真正分发 分发到的index在主线程index范围内的,提交到主线程池的执行(primary_threads_) 指定的index为长时间任务的,只能分发到优先级队列执行(priority_task_queue_) 其他情况下分发到默认的任务队列执行(task_queue_) 返回的是将task封装成std::packaged_task后get_future返回的std::future对象
| FunctionType |
| task | |
| index |
| auto UThreadPool::commitWithPriority | ( | const FunctionType & | task, |
| int | priority | ||
| ) | -> std::future<decltype(std::declval<FunctionType>()())> |
根据优先级,执行任务
| FunctionType |
| task | |
| priority | 优先级别。自然序从大到小依次执行 |
| auto UThreadPool::commitWithTid | ( | const FunctionType & | task, |
| CIndex | tid, | ||
| CBool | enable, | ||
| CBool | lockable | ||
| ) | -> std::future<decltype(std::declval<FunctionType>()())> |
向特定的线程id中,提交任务信息
| FunctionType |
| task | |
| tid | 线程id。如果超出主线程个数范围,则默认写入pool的通用队列中 |
| enable | 是否启用上锁/解锁功能 |
| lockable | 上锁(true) / 解锁(false) |
生成辅助线程。内部确保辅助线程数量不超过设定参数 最大线程数量(max) ≥ 主线程数量(default) + 已有辅助线程数量(secondary)
| size |
|
finalvirtual |
释放所有的线程信息
这里之所以 destroy和 delete分开两个循环执行, 是因为当前线程被delete后,还可能存在未被delete的主线程,来steal当前线程的任务 在windows环境下,可能出现问题。 destroy 和 delete 分开之后,不会出现此问题。 感谢 Ryan大佬(https://github.com/ryanhuang) 提供的帮助
Reimplemented from CObject.

根据传入的策略信息,确定最终执行方式
| origIndex |
| CVoid UThreadPool::execute | ( | const FunctionType & | task, |
| CIndex | index = CGRAPH_DEFAULT_TASK_STRATEGY |
||
| ) |
异步执行任务
| FunctionType |
| task | |
| index |

| UThreadPoolConfig UThreadPool::getConfig | ( | ) | const |
获取线程池配置信息
获取根据线程id信息,获取线程index信息 thread_record_map_会维护tid到线程index的映射关系,通过查表得到index
| tid |
|
finalvirtual |
开启所有的线程信息
等待所有thread 设置完毕之后,再进行 init(), 避免在个别的平台上,可能出现 thread竞争访问其他线程、并且导致异常的情况 参考: https://github.com/ChunelFeng/CGraph/issues/309
策略更新: 初始化的时候,也可以创建n个辅助线程。目的是为了配合仅使用 pool中 priority_queue 的场景 一般情况下,建议为0。
Reimplemented from CObject.

| CBool UThreadPool::isInit | ( | ) | const |
判断线程池是否已经初始化了
|
protected |
监控线程执行函数,主要是判断是否需要增加线程,或销毁线程 增/删 操作,仅针对secondary类型线程生效

| CStatus UThreadPool::setConfig | ( | const UThreadPoolConfig & | config | ) |
设置线程池相关配置信息,需要在init()函数调用前,完成设置
| config |
| CStatus UThreadPool::submit | ( | CGRAPH_DEFAULT_CONST_FUNCTION_REF | func, |
| CMSec | ttl = CGRAPH_MAX_BLOCK_TTL, |
||
| CGRAPH_CALLBACK_CONST_FUNCTION_REF | onFinished = nullptr |
||
| ) |
针对单个任务的情况,复用任务组信息,实现单个任务直接执行
| task | |
| ttl | |
| onFinished |

| CStatus UThreadPool::submit | ( | const UTaskGroup & | taskGroup, |
| CMSec | ttl = CGRAPH_MAX_BLOCK_TTL |
||
| ) |
遍历入参taskGroup的task_arr_,通过commit接口提交到对应的线程池或者任务队列; 将commit返回的std::future对象存入futures数组; 根据taskGtoup的TTL和本线程池配置的TTL计算futures等待的timeout (deadline), 在deadline到达之前遍历等到所有的std::future对象返回,并根据返回值统计status; 最后执行taskGroup自带的on_finished_接口
执行任务组信息 取taskGroup内部ttl和入参ttl的最小值,为计算ttl标准
| taskGroup | |
| ttl |
| taskGroup | |
| ttl |

|
private |
|
private |
|
private |
|
private |
|
private |
|
private |
|
private |
|
private |
|
private |
|
private |