CGraph
Public Member Functions | Protected Member Functions | Private Attributes | List of all members
UThreadPool Class Reference

线程池,包含主线程池、辅助线程池、普通任务队列、优先级任务队列、线程池配置、监控线程等 More...

#include <UThreadPool.h>

Inheritance diagram for UThreadPool:
Inheritance graph
[legend]
Collaboration diagram for UThreadPool:
Collaboration graph
[legend]

Public Member Functions

 UThreadPool (CBool autoInit=true, const UThreadPoolConfig &config=UThreadPoolConfig()) noexcept
 
 ~UThreadPool () override
 
CStatus setConfig (const UThreadPoolConfig &config)
 
UThreadPoolConfig getConfig () const
 
CStatus init () final
 
template<typename FunctionType >
auto commit (const FunctionType &task, CIndex index=CGRAPH_DEFAULT_TASK_STRATEGY) -> std::future< decltype(std::declval< FunctionType >()())>
 
template<typename FunctionType >
auto commitWithTid (const FunctionType &task, CIndex tid, CBool enable, CBool lockable) -> std::future< decltype(std::declval< FunctionType >()())>
 
template<typename FunctionType >
auto commitWithPriority (const FunctionType &task, int priority) -> std::future< decltype(std::declval< FunctionType >()())>
 
template<typename FunctionType >
CVoid execute (const FunctionType &task, CIndex index=CGRAPH_DEFAULT_TASK_STRATEGY)
 
CStatus submit (const UTaskGroup &taskGroup, CMSec ttl=CGRAPH_MAX_BLOCK_TTL)
 遍历入参taskGroup的task_arr_,通过commit接口提交到对应的线程池或者任务队列; 将commit返回的std::future对象存入futures数组; 根据taskGtoup的TTL和本线程池配置的TTL计算futures等待的timeout (deadline), 在deadline到达之前遍历等到所有的std::future对象返回,并根据返回值统计status; 最后执行taskGroup自带的on_finished_接口 More...
 
CStatus submit (CGRAPH_DEFAULT_CONST_FUNCTION_REF func, CMSec ttl=CGRAPH_MAX_BLOCK_TTL, CGRAPH_CALLBACK_CONST_FUNCTION_REF onFinished=nullptr)
 
CIndex getThreadIndex (CSize tid)
 
CStatus destroy () final
 
CBool isInit () const
 
CStatus createSecondaryThread (CInt size)
 
CStatus releaseSecondaryThread (CInt size)
 
template<typename FunctionType >
CGRAPH_NAMESPACE_BEGIN auto commit (const FunctionType &task, CIndex index) -> std::future< decltype(std::declval< FunctionType >()())>
 
- Public Member Functions inherited from CObject
 CObject ()=default
 
virtual ~CObject ()=default
 

Protected Member Functions

virtual CIndex dispatch (CIndex origIndex)
 
CVoid monitor ()
 
- Protected Member Functions inherited from UThreadObject
CStatus run () override
 
- Protected Member Functions inherited from UtilsObject
CStatus run () override
 

Private Attributes

CBool is_init_ { false }
 
CInt cur_index_ = 0
 
UAtomicQueue< UTasktask_queue_
 
UAtomicPriorityQueue< UTaskpriority_task_queue_
 
std::vector< UThreadPrimaryPtrprimary_threads_
 
std::list< std::unique_ptr< UThreadSecondary > > secondary_threads_
 
UThreadPoolConfig config_
 
std::thread monitor_thread_
 
std::map< CSize, int > thread_record_map_
 
std::mutex st_mutex_
 

Detailed Description

线程池,包含主线程池、辅助线程池、普通任务队列、优先级任务队列、线程池配置、监控线程等

Constructor & Destructor Documentation

◆ UThreadPool()

CGRAPH_NAMESPACE_BEGIN UThreadPool::UThreadPool ( CBool  autoInit = true,
const UThreadPoolConfig config = UThreadPoolConfig() 
)
explicitnoexcept

通过默认设置参数,来创建线程池

Parameters
autoInit是否自动开启线程池功能
config

◆ ~UThreadPool()

UThreadPool::~UThreadPool ( )
override

析构函数

Here is the call graph for this function:

Member Function Documentation

◆ commit() [1/2]

template<typename FunctionType >
CGRAPH_NAMESPACE_BEGIN auto UThreadPool::commit ( const FunctionType &  task,
CIndex  index 
) -> std::future<decltype(std::declval<FunctionType>()())>

如果是长时间任务,则交给特定的任务队列,仅由辅助线程处理 目的是防止有很多长时间任务,将所有运行的线程均阻塞 长任务程序,默认优先级较低

◆ commit() [2/2]

template<typename FunctionType >
auto UThreadPool::commit ( const FunctionType &  task,
CIndex  index = CGRAPH_DEFAULT_TASK_STRATEGY 
) -> std::future< decltype(std::declval< FunctionType >()())>

提交任务信息并分发到对应的任务队列(普通or优先)和线程池(主or辅) 入参index可以指定线程id,但是实际上仍然需要通过dispatch来真正分发 分发到的index在主线程index范围内的,提交到主线程池的执行(primary_threads_) 指定的index为长时间任务的,只能分发到优先级队列执行(priority_task_queue_) 其他情况下分发到默认的任务队列执行(task_queue_) 返回的是将task封装成std::packaged_task后get_future返回的std::future对象

Template Parameters
FunctionType
Parameters
task
index
Returns

◆ commitWithPriority()

template<typename FunctionType >
auto UThreadPool::commitWithPriority ( const FunctionType &  task,
int  priority 
) -> std::future<decltype(std::declval<FunctionType>()())>

根据优先级,执行任务

Template Parameters
FunctionType
Parameters
task
priority优先级别。自然序从大到小依次执行
Returns
@notice 建议,priority 范围在 [-100, 100] 之间

◆ commitWithTid()

template<typename FunctionType >
auto UThreadPool::commitWithTid ( const FunctionType &  task,
CIndex  tid,
CBool  enable,
CBool  lockable 
) -> std::future<decltype(std::declval<FunctionType>()())>

向特定的线程id中,提交任务信息

Template Parameters
FunctionType
Parameters
task
tid线程id。如果超出主线程个数范围,则默认写入pool的通用队列中
enable是否启用上锁/解锁功能
lockable上锁(true) / 解锁(false)
Returns

◆ createSecondaryThread()

CStatus UThreadPool::createSecondaryThread ( CInt  size)

生成辅助线程。内部确保辅助线程数量不超过设定参数 最大线程数量(max) ≥ 主线程数量(default) + 已有辅助线程数量(secondary)

Parameters
size
Returns

◆ destroy()

CStatus UThreadPool::destroy ( )
finalvirtual

释放所有的线程信息

Returns

这里之所以 destroy和 delete分开两个循环执行, 是因为当前线程被delete后,还可能存在未被delete的主线程,来steal当前线程的任务 在windows环境下,可能出现问题。 destroy 和 delete 分开之后,不会出现此问题。 感谢 Ryan大佬(https://github.com/ryanhuang) 提供的帮助

Reimplemented from CObject.

Here is the call graph for this function:

◆ dispatch()

CIndex UThreadPool::dispatch ( CIndex  origIndex)
protectedvirtual

根据传入的策略信息,确定最终执行方式

Parameters
origIndex
Returns

◆ execute()

template<typename FunctionType >
CVoid UThreadPool::execute ( const FunctionType &  task,
CIndex  index = CGRAPH_DEFAULT_TASK_STRATEGY 
)

异步执行任务

Template Parameters
FunctionType
Parameters
task
index
Here is the call graph for this function:

◆ getConfig()

UThreadPoolConfig UThreadPool::getConfig ( ) const

获取线程池配置信息

Returns

◆ getThreadIndex()

CIndex UThreadPool::getThreadIndex ( CSize  tid)

获取根据线程id信息,获取线程index信息 thread_record_map_会维护tid到线程index的映射关系,通过查表得到index

Parameters
tid
Returns
@notice 辅助线程返回-1

◆ init()

CStatus UThreadPool::init ( )
finalvirtual

开启所有的线程信息

Returns

等待所有thread 设置完毕之后,再进行 init(), 避免在个别的平台上,可能出现 thread竞争访问其他线程、并且导致异常的情况 参考: https://github.com/ChunelFeng/CGraph/issues/309

策略更新: 初始化的时候,也可以创建n个辅助线程。目的是为了配合仅使用 pool中 priority_queue 的场景 一般情况下,建议为0。

Reimplemented from CObject.

Here is the call graph for this function:

◆ isInit()

CBool UThreadPool::isInit ( ) const

判断线程池是否已经初始化了

Returns

◆ monitor()

CVoid UThreadPool::monitor ( )
protected

监控线程执行函数,主要是判断是否需要增加线程,或销毁线程 增/删 操作,仅针对secondary类型线程生效

Here is the call graph for this function:

◆ releaseSecondaryThread()

CStatus UThreadPool::releaseSecondaryThread ( CInt  size)

删除辅助线程

Parameters
size
Returns

◆ setConfig()

CStatus UThreadPool::setConfig ( const UThreadPoolConfig config)

设置线程池相关配置信息,需要在init()函数调用前,完成设置

Parameters
config
Returns
@notice 通过单例类(UThreadPoolSingleton)开启线程池,则线程池默认init。需要 destroy 后才可以设置参数

◆ submit() [1/2]

CStatus UThreadPool::submit ( CGRAPH_DEFAULT_CONST_FUNCTION_REF  func,
CMSec  ttl = CGRAPH_MAX_BLOCK_TTL,
CGRAPH_CALLBACK_CONST_FUNCTION_REF  onFinished = nullptr 
)

针对单个任务的情况,复用任务组信息,实现单个任务直接执行

Parameters
task
ttl
onFinished
Returns
Here is the call graph for this function:

◆ submit() [2/2]

CStatus UThreadPool::submit ( const UTaskGroup taskGroup,
CMSec  ttl = CGRAPH_MAX_BLOCK_TTL 
)

遍历入参taskGroup的task_arr_,通过commit接口提交到对应的线程池或者任务队列; 将commit返回的std::future对象存入futures数组; 根据taskGtoup的TTL和本线程池配置的TTL计算futures等待的timeout (deadline), 在deadline到达之前遍历等到所有的std::future对象返回,并根据返回值统计status; 最后执行taskGroup自带的on_finished_接口

执行任务组信息 取taskGroup内部ttl和入参ttl的最小值,为计算ttl标准

Parameters
taskGroup
ttl
Returns
Parameters
taskGroup
ttl
Returns
CStatus
Here is the call graph for this function:

Member Data Documentation

◆ config_

UThreadPoolConfig UThreadPool::config_
private

◆ cur_index_

CInt UThreadPool::cur_index_ = 0
private

◆ is_init_

CBool UThreadPool::is_init_ { false }
private

◆ monitor_thread_

std::thread UThreadPool::monitor_thread_
private

◆ primary_threads_

std::vector<UThreadPrimaryPtr> UThreadPool::primary_threads_
private

◆ priority_task_queue_

UAtomicPriorityQueue<UTask> UThreadPool::priority_task_queue_
private

◆ secondary_threads_

std::list<std::unique_ptr<UThreadSecondary> > UThreadPool::secondary_threads_
private

◆ st_mutex_

std::mutex UThreadPool::st_mutex_
private

◆ task_queue_

UAtomicQueue<UTask> UThreadPool::task_queue_
private

◆ thread_record_map_

std::map<CSize, int> UThreadPool::thread_record_map_
private

The documentation for this class was generated from the following files: