Improved performance and thread safety

This commit is contained in:
Nefarius 2014-06-11 16:49:23 +02:00 committed by Nicholas Hastings
parent a38bc8bae4
commit 88f78e152b
2 changed files with 105 additions and 112 deletions

View File

@ -100,7 +100,6 @@ bool CurlExt::SDK_OnLoad(char *error, size_t maxlength, bool late)
plsys->AddPluginsListener(this);
smutils->AddGameFrameHook(&OnGameFrame);
g_SessionManager.Initialize();
return true;
}
@ -575,7 +574,7 @@ void HTTPSessionManager::PluginUnloaded(IPlugin *plugin)
{
// Check for pending requests and cancel them
{
pRequestsLock->Lock();
ke::AutoLock lock(&requests_);
if (!requests.empty())
{
@ -591,44 +590,44 @@ void HTTPSessionManager::PluginUnloaded(IPlugin *plugin)
}
}
}
pRequestsLock->Unlock();
}
// Wait for running requests to finish
if (!threads.empty())
{
//for (std::list<IThreadHandle*>::iterator i(threads.begin()), end(threads.end()); i != end; ++i)
for (ke::LinkedList<IThreadHandle*>::iterator i(threads.begin()), end(threads.end()); i != end; ++i)
{
if ((*i) != NULL)
{
(*i)->WaitForThread();
// NOTE: do not remove handles here as it would break
// the iteration. Dead thread handles are destroyed
// on the next available game frame in the main thread
// Check for pending callbacks and cancel them
{
pCallbacksLock->Lock();
ke::AutoTryLock lock(&threads_);
if (!callbacks.empty())
if (threads_.Locked())
{
if (!threads.empty())
{
for (ke::LinkedList<IThreadHandle*>::iterator i(threads.begin()), end(threads.end()); i != end; ++i)
{
if ((*i) != NULL)
{
// Run through callback queue
//for (Queue<HTTPRequest>::iterator i(callbacks.begin()), end(callbacks.end()); i != end; ++i)
for (unsigned int i = 0; i < callbacks.length(); i++)
(*i)->WaitForThread();
(*i)->DestroyThis();
i = this->threads.erase(i);
// Check for pending callbacks and cancel them
{
// Identify callbacks associated to (nearly) unmapped plugin context
if (callbacks[i].pCtx == plugin->GetBaseContext())
ke::AutoLock lock(&callbacks_);
if (!callbacks.empty())
{
// All context related data and callbacks are marked invalid
callbacks[i].pCtx = NULL;
callbacks[i].contextPack.pCallbackFunction = NULL;
// Run through callback queue
for (unsigned int i = 0; i < callbacks.length(); i++)
{
// Identify callbacks associated to (nearly) unmapped plugin context
if (callbacks[i].pCtx == plugin->GetBaseContext())
{
// All context related data and callbacks are marked invalid
callbacks[i].pCtx = NULL;
callbacks[i].contextPack.pCallbackFunction = NULL;
}
}
}
}
}
pCallbacksLock->Unlock();
}
}
}
@ -648,9 +647,10 @@ void HTTPSessionManager::PostAndDownload(IPluginContext *pCtx,
request.url = url;
request.contextPack = contextPack;
pRequestsLock->Lock();
this->requests.append(request);
pRequestsLock->Unlock();
{
ke::AutoLock lock(&requests_);
this->requests.append(request);
}
}
void HTTPSessionManager::Download(IPluginContext *pCtx,
@ -666,9 +666,10 @@ void HTTPSessionManager::Download(IPluginContext *pCtx,
request.url = url;
request.contextPack = contextPack;
pRequestsLock->Lock();
this->requests.append(request);
pRequestsLock->Unlock();
{
ke::AutoLock lock(&requests_);
this->requests.append(request);
}
}
void HTTPSessionManager::BurnSessionHandle(IPluginContext *pCtx,
@ -694,124 +695,116 @@ void HTTPSessionManager::BurnSessionHandle(IPluginContext *pCtx,
void HTTPSessionManager::RunFrame()
{
// Try to execute pending callbacks
if (this->pCallbacksLock->TryLock())
{
if (!this->callbacks.empty())
ke::AutoTryLock lock(&callbacks_);
if (this->callbacks_.Locked())
{
HTTPRequest request = this->callbacks.back();
IPluginContext *pCtx = request.pCtx;
// Is the requesting plugin still alive?
if (pCtx != NULL)
if (!this->callbacks.empty())
{
funcid_t id = request.contextPack.pCallbackFunction->uPluginFunction;
IPluginFunction *pFunction = pCtx->GetFunctionById(id);
HTTPRequest request = this->callbacks.back();
IPluginContext *pCtx = request.pCtx;
if (pFunction != NULL)
// Is the requesting plugin still alive?
if (pCtx != NULL)
{
// Push data and execute callback
pFunction->PushCell(request.handles.hndlSession);
pFunction->PushCell(request.result);
pFunction->PushCell(request.handles.hndlDownloader);
if (request.contextPack.pCallbackFunction->bHasContext)
funcid_t id = request.contextPack.pCallbackFunction->uPluginFunction;
IPluginFunction *pFunction = pCtx->GetFunctionById(id);
if (pFunction != NULL)
{
pFunction->PushCell(request.contextPack.iPluginContextValue);
// Push data and execute callback
pFunction->PushCell(request.handles.hndlSession);
pFunction->PushCell(request.result);
pFunction->PushCell(request.handles.hndlDownloader);
if (request.contextPack.pCallbackFunction->bHasContext)
{
pFunction->PushCell(request.contextPack.iPluginContextValue);
}
pFunction->Execute(NULL);
}
pFunction->Execute(NULL);
}
this->callbacks.pop();
}
this->callbacks.pop();
}
this->pCallbacksLock->Unlock();
}
// Try to fire up some new asynchronous requests
if (pRequestsLock->TryLock())
{
// NOTE: this is my "burst thread creation" solution
// Using a thread pool is slow as it executes the threads
// sequentially and not parallel.
// Not using a thread pool might cause SRCDS to crash, so
// we are spawning just a few threads every frame to not
// affect performance too much and still having the advantage
// of parallel execution.
for (unsigned int i = 0; i < iMaxRequestsPerFrame; i++)
ke::AutoTryLock lock(&requests_);
if (requests_.Locked())
{
if (!this->requests.empty())
// NOTE: this is my "burst thread creation" solution
// Using a thread pool is slow as it executes the threads
// sequentially and not parallel.
// Not using a thread pool might cause SRCDS to crash, so
// we are spawning just a few threads every frame to not
// affect performance too much and still having the advantage
// of parallel execution.
for (unsigned int i = 0; i < iMaxRequestsPerFrame; i++)
{
// Create new thread object
HTTPAsyncRequestHandler *async =
new HTTPAsyncRequestHandler(this->requests.back());
// Skip requests with unloaded parent plugin
if (this->requests.back().pCtx != NULL)
if (!this->requests.empty())
{
// Create new thread
IThreadHandle *pThread =
threader->MakeThread(async, Thread_Default);
// Save thread handle
//this->threads.push_front(pThread);
this->threads.append(pThread);
// Create new thread object
HTTPAsyncRequestHandler *async =
new HTTPAsyncRequestHandler(this->requests.back());
// Skip requests with unloaded parent plugin
if (this->requests.back().pCtx != NULL)
{
// Create new thread
IThreadHandle *pThread =
threader->MakeThread(async, Thread_Default);
// Save thread handle
//this->threads.push_front(pThread);
this->threads.append(pThread);
}
// Remove request as it's being handled now
this->requests.pop();
}
// Remove request as it's being handled now
this->requests.pop();
}
}
pRequestsLock->Unlock();
}
// Do some quick "garbage collection" on finished threads
RemoveFinishedThreads();
}
void HTTPSessionManager::Initialize()
{
pRequestsLock = threader->MakeMutex();
pCallbacksLock = threader->MakeMutex();
}
void HTTPSessionManager::Shutdown()
{
// Block until all running threads have finished
this->RemoveFinishedThreads();
// Destroy all remaining callback calls
this->pCallbacksLock->Lock();
this->callbacks.clear();
this->pCallbacksLock->Unlock();
if (pRequestsLock != NULL)
{
pRequestsLock->DestroyThis();
}
if (pCallbacksLock != NULL)
{
pCallbacksLock->DestroyThis();
ke::AutoLock lock(&callbacks_);
this->callbacks.clear();
}
}
void HTTPSessionManager::AddCallback(HTTPRequest request)
{
this->pCallbacksLock->Lock();
this->callbacks.append(request);
this->pCallbacksLock->Unlock();
{
ke::AutoLock lock(&callbacks_);
this->callbacks.append(request);
}
}
void HTTPSessionManager::RemoveFinishedThreads()
{
// Do some quick "garbage collection" on finished threads
if (!this->threads.empty())
ke::AutoLock lock(&threads_);
if (threads_.Locked())
{
for (ke::LinkedList<IThreadHandle*>::iterator i(threads.begin()), end(threads.end()); i != end; ++i)
if (!this->threads.empty())
{
if ((*i) != NULL)
for (ke::LinkedList<IThreadHandle*>::iterator i(threads.begin()), end(threads.end()); i != end; ++i)
{
if ((*i)->GetState() == Thread_Done)
if ((*i) != NULL)
{
(*i)->DestroyThis();
i = this->threads.erase(i);
if ((*i)->GetState() == Thread_Done)
{
(*i)->DestroyThis();
i = this->threads.erase(i);
}
}
}
}

View File

@ -42,6 +42,7 @@
#include "IBaseDownloader.h"
#include <amtl/am-linkedlist.h>
#include <amtl/am-vector.h>
#include <amtl/am-thread-utils.h>
#include <string.h>
@ -168,7 +169,6 @@ public:
}
~HTTPSessionManager() {}
void Initialize();
void Shutdown();
void PluginUnloaded(IPlugin *plugin);
void RunFrame();
@ -213,11 +213,11 @@ private:
void AddCallback(HTTPRequest request);
static const unsigned int iMaxRequestsPerFrame = 20;
IMutex *pRequestsLock;
ke::ConditionVariable requests_;
ke::Vector<HTTPRequest> requests;
// NOTE: this needs no lock since it's only accessed from main thread
ke::ConditionVariable threads_;
ke::LinkedList<IThreadHandle*> threads;
IMutex *pCallbacksLock;
ke::ConditionVariable callbacks_;
ke::Vector<HTTPRequest> callbacks;
class HTTPAsyncRequestHandler : public IThread