Updated jobthread.h.

This commit is contained in:
Tony Paloma 2012-11-12 21:10:32 -05:00
parent 2b684d874f
commit c62127dca8

View File

@ -1,4 +1,4 @@
//========== Copyright © 2005, Valve Corporation, All rights reserved. ========
//========== Copyright ? 2005, Valve Corporation, All rights reserved. ========
//
// Purpose: A utility for a discrete job-oriented worker thread.
//
@ -97,15 +97,15 @@ enum JobPriority_t
#define TP_MAX_POOL_THREADS 64
struct ThreadPoolStartParams_t
{
ThreadPoolStartParams_t( bool ioThreads = false, int threads = -1, int *pAffinities = NULL, ThreeState_t distribute = TRS_NONE, unsigned stackSize = 0, int threadPriority = SHRT_MIN )
: nThreads( threads ), fDistribute( distribute ), nStackSize( stackSize ), iThreadPriority( threadPriority ), bIOThreads( ioThreads )
ThreadPoolStartParams_t( bool bIOThreads = false, unsigned nThreads = -1, int *pAffinities = NULL, ThreeState_t fDistribute = TRS_NONE, unsigned nStackSize = -1, int iThreadPriority = SHRT_MIN )
: bIOThreads( bIOThreads ), nThreads( nThreads ), fDistribute( fDistribute ), nStackSize( nStackSize ), iThreadPriority( iThreadPriority ), nThreadsMax( -1 )
{
bUseAffinityTable = ( pAffinities != NULL ) && ( fDistribute == TRS_TRUE ) && ( nThreads != 0 );
bUseAffinityTable = ( pAffinities != NULL ) && ( fDistribute == TRS_TRUE ) && ( nThreads != -1 );
if ( bUseAffinityTable )
{
// user supplied an optional 1:1 affinity mapping to override normal distribute behavior
nThreads = MIN( TP_MAX_POOL_THREADS, nThreads );
for ( int i = 0; i < nThreads; i++ )
for ( unsigned int i = 0; i < nThreads; i++ )
{
iAffinityTable[i] = pAffinities[i];
}
@ -113,6 +113,7 @@ struct ThreadPoolStartParams_t
}
int nThreads;
int nThreadsMax;
ThreeState_t fDistribute;
int nStackSize;
int iThreadPriority;
@ -137,6 +138,7 @@ enum ThreadPoolMessages_t
{
TPM_EXIT, // Exit the thread
TPM_SUSPEND, // Suspend after next operation
TPM_RUNFUNCTOR, // Run functor, reply when done.
};
//---------------------------------------------------------
@ -168,7 +170,7 @@ public:
//-----------------------------------------------------
// Offer the current thread to the pool
//-----------------------------------------------------
virtual int YieldWait( CThreadEvent *pEvents, int nEvents, bool bWaitAll = true, unsigned timeout = TT_INFINITE ) = 0;
virtual int YieldWait( CThreadEvent **pEvents, int nEvents, bool bWaitAll = true, unsigned timeout = TT_INFINITE ) = 0;
virtual int YieldWait( CJob **, int nJobs, bool bWaitAll = true, unsigned timeout = TT_INFINITE ) = 0;
virtual void Yield( unsigned timeout ) = 0;
@ -180,6 +182,13 @@ public:
//-----------------------------------------------------
virtual void AddJob( CJob * ) = 0;
//-----------------------------------------------------
// All threads execute pFunctor asap. Thread will either wake up
// and execute or execute pFunctor right after completing current job and
// before looking for another job.
//-----------------------------------------------------
virtual void ExecuteHighPriorityFunctor( CFunctor *pFunctor ) = 0;
//-----------------------------------------------------
// Add an function object to the queue (master thread)
//-----------------------------------------------------
@ -429,13 +438,14 @@ class CJob : public CRefCounted1<IRefCounted, CRefCountServiceMT>
public:
CJob( JobPriority_t priority = JP_NORMAL )
: m_status( JOB_STATUS_UNSERVICED ),
m_ThreadPoolData( JOB_NO_DATA ),
m_priority( priority ),
m_flags( 0 ),
m_iServicingThread( -1 ),
m_ThreadPoolData( JOB_NO_DATA ),
m_pThreadPool( NULL ),
m_CompleteEvent( true )
m_CompleteEvent( true ),
m_iServicingThread( -1 )
{
m_szDescription[ 0 ] = 0;
}
//-----------------------------------------------------
@ -462,13 +472,18 @@ public:
bool CanExecute() const { return ( m_status == JOB_STATUS_PENDING || m_status == JOB_STATUS_UNSERVICED ); }
bool IsFinished() const { return ( m_status != JOB_STATUS_PENDING && m_status != JOB_STATUS_INPROGRESS && m_status != JOB_STATUS_UNSERVICED ); }
JobStatus_t GetStatus() const { return m_status; }
/// Slam the status to a particular value. This is named "slam" instead of "set,"
/// to warn you that it should only be used in unusual situations. Otherwise, the
/// job manager really should manage the status for you, and you should not manhandle it.
void SlamStatus(JobStatus_t s) { m_status = s; }
//-----------------------------------------------------
// Try to acquire ownership (to satisfy). If you take the lock, you must either execute or abort.
//-----------------------------------------------------
bool TryLock() const { return m_mutex.TryLock(); }
void Lock() const { m_mutex.Lock(); }
void Unlock() const { m_mutex.Unlock(); }
bool TryLock() { return m_mutex.TryLock(); }
void Lock() { m_mutex.Lock(); }
void Unlock() { m_mutex.Unlock(); }
//-----------------------------------------------------
// Thread event support (safe for NULL this to simplify code )
@ -490,7 +505,18 @@ public:
//-----------------------------------------------------
JobStatus_t Abort( bool bDiscard = true );
virtual char const *Describe() { return "Job"; }
virtual char const *Describe() { return m_szDescription[ 0 ] ? m_szDescription : "Job"; }
virtual void SetDescription( const char *pszDescription )
{
if( pszDescription )
{
Q_strncpy( m_szDescription, pszDescription, sizeof( m_szDescription ) );
}
else
{
m_szDescription[ 0 ] = 0;
}
}
private:
//-----------------------------------------------------
@ -498,13 +524,14 @@ private:
JobStatus_t m_status;
JobPriority_t m_priority;
CThreadFastMutex m_mutex;
CThreadMutex m_mutex;
unsigned char m_flags;
char m_iServicingThread;
short m_reserved;
ThreadPoolData_t m_ThreadPoolData;
IThreadPool * m_pThreadPool;
CThreadEvent m_CompleteEvent;
char m_szDescription[ 32 ];
private:
//-----------------------------------------------------
@ -679,13 +706,12 @@ private:
// Work splitting: array split, best when cost per item is roughly equal
//-----------------------------------------------------------------------------
#ifdef _MSC_VER
#pragma warning(push)
#pragma warning(disable:4389)
#pragma warning(disable:4018)
#pragma warning(disable:4701)
#endif
#if !defined(__clang__) && ( !defined( GNUC ) || ( defined( GNUC ) && __GNUC_MINOR__ < 3 ) )
#define DEFINE_NON_MEMBER_ITER_RANGE_PARALLEL(N) \
template <typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N, typename ITERTYPE1, typename ITERTYPE2> \
void IterRangeParallel(FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( ITERTYPE1, ITERTYPE2 FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ), ITERTYPE1 from, ITERTYPE2 to FUNC_ARG_FORMAL_PARAMS_##N ) \
@ -755,7 +781,7 @@ FUNC_GENERATE_ALL( DEFINE_NON_MEMBER_ITER_RANGE_PARALLEL );
}
FUNC_GENERATE_ALL( DEFINE_MEMBER_ITER_RANGE_PARALLEL );
#endif // GCC
//-----------------------------------------------------------------------------
// Work splitting: competitive, best when cost per item varies a lot
@ -820,9 +846,10 @@ template <typename ITEM_TYPE, class ITEM_PROCESSOR_TYPE>
class CParallelProcessor
{
public:
CParallelProcessor()
CParallelProcessor( const char *pszDescription )
{
m_pItems = m_pLimit= 0;
m_szDescription = pszDescription;
}
void Run( ITEM_TYPE *pItems, unsigned nItems, int nMaxParallel = INT_MAX, IThreadPool *pThreadPool = NULL )
@ -847,7 +874,7 @@ public:
if (! pThreadPool ) // only possible on linux
{
DoExecute();
DoExecute( );
return;
}
@ -865,6 +892,7 @@ public:
while( i-- )
{
jobs[i] = pThreadPool->QueueCall( this, &CParallelProcessor<ITEM_TYPE, ITEM_PROCESSOR_TYPE>::DoExecute );
jobs[i]->SetDescription( m_szDescription );
}
DoExecute();
@ -910,34 +938,45 @@ private:
}
CInterlockedPtr<ITEM_TYPE> m_pItems;
ITEM_TYPE * m_pLimit;
const char * m_szDescription;
};
template <typename ITEM_TYPE>
inline void ParallelProcess( ITEM_TYPE *pItems, unsigned nItems, void (*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
inline void ParallelProcess( const char *pszDescription, ITEM_TYPE *pItems, unsigned nItems, void (*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
{
CParallelProcessor<ITEM_TYPE, CFuncJobItemProcessor<ITEM_TYPE> > processor;
CParallelProcessor<ITEM_TYPE, CFuncJobItemProcessor<ITEM_TYPE> > processor( pszDescription );
processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd );
processor.Run( pItems, nItems, nMaxParallel );
}
template <typename ITEM_TYPE, typename OBJECT_TYPE, typename FUNCTION_CLASS >
inline void ParallelProcess( ITEM_TYPE *pItems, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( ITEM_TYPE & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
inline void ParallelProcess( const char *pszDescription, ITEM_TYPE *pItems, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( ITEM_TYPE & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
{
CParallelProcessor<ITEM_TYPE, CMemberFuncJobItemProcessor<ITEM_TYPE, OBJECT_TYPE, FUNCTION_CLASS> > processor;
CParallelProcessor<ITEM_TYPE, CMemberFuncJobItemProcessor<ITEM_TYPE, OBJECT_TYPE, FUNCTION_CLASS> > processor( pszDescription );
processor.m_ItemProcessor.Init( pObject, pfnProcess, pfnBegin, pfnEnd );
processor.Run( pItems, nItems, nMaxParallel );
}
// Parallel Process that lets you specify threadpool
template <typename ITEM_TYPE>
inline void ParallelProcess( const char *pszDescription, IThreadPool *pPool, ITEM_TYPE *pItems, unsigned nItems, void (*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
{
CParallelProcessor<ITEM_TYPE, CFuncJobItemProcessor<ITEM_TYPE> > processor( pszDescription );
processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd );
processor.Run( pItems, nItems, nMaxParallel, pPool );
}
template <class ITEM_PROCESSOR_TYPE>
class CParallelLoopProcessor
{
public:
CParallelLoopProcessor()
CParallelLoopProcessor( const char *pszDescription )
{
m_lIndex = m_lLimit= 0;
m_nActive = 0;
m_szDescription = pszDescription;
}
void Run( long lBegin, long nItems, int nMaxParallel = INT_MAX )
@ -949,9 +988,9 @@ public:
int i = g_pThreadPool->NumIdleThreads();
if ( nMaxParallel < i)
{
{
i = nMaxParallel;
}
}
while( i-- )
{
@ -998,20 +1037,21 @@ private:
CInterlockedInt m_lIndex;
long m_lLimit;
CInterlockedInt m_nActive;
const char * m_szDescription;
};
inline void ParallelLoopProcess( long lBegin, unsigned nItems, void (*pfnProcess)( long const & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
inline void ParallelLoopProcess( const char *szDescription, long lBegin, unsigned nItems, void (*pfnProcess)( long const & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
{
CParallelLoopProcessor< CFuncJobItemProcessor< long const > > processor;
CParallelLoopProcessor< CFuncJobItemProcessor< long const > > processor( szDescription );
processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd );
processor.Run( lBegin, nItems, nMaxParallel );
}
template < typename OBJECT_TYPE, typename FUNCTION_CLASS >
inline void ParallelLoopProcess( long lBegin, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( long const & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
inline void ParallelLoopProcess( const char *szDescription, long lBegin, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( long const & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
{
CParallelLoopProcessor< CMemberFuncJobItemProcessor<long const, OBJECT_TYPE, FUNCTION_CLASS> > processor;
CParallelLoopProcessor< CMemberFuncJobItemProcessor<long const, OBJECT_TYPE, FUNCTION_CLASS> > processor( szDescription );
processor.m_ItemProcessor.Init( pObject, pfnProcess, pfnBegin, pfnEnd );
processor.Run( lBegin, nItems, nMaxParallel );
}
@ -1028,6 +1068,11 @@ public:
CParallelProcessorBase()
{
m_nActive = 0;
m_szDescription = NULL;
}
void SetDescription( const char *pszDescription )
{
m_szDescription = pszDescription;
}
protected:
@ -1080,6 +1125,7 @@ private:
}
CInterlockedInt m_nActive;
const char * m_szDescription;
};
@ -1160,7 +1206,8 @@ inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T
inline bool IThreadPool::YieldWait( CThreadEvent &event, unsigned timeout )
{
return ( YieldWait( &event, 1, true, timeout ) != TW_TIMEOUT );
CThreadEvent *pEvent = &event;
return ( YieldWait( &pEvent, 1, true, timeout ) != TW_TIMEOUT );
}
inline bool IThreadPool::YieldWait( CJob *pJob, unsigned timeout )
@ -1275,3 +1322,4 @@ inline JobStatus_t CJob::Abort( bool bDiscard )
//-----------------------------------------------------------------------------
#endif // JOBTHREAD_H