00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023 #include <algorithm>
00024 using namespace std;
00025
00026
00027 #include <QCoreApplication>
00028 #include <QWaitCondition>
00029 #include <QMutexLocker>
00030 #include <QRunnable>
00031 #include <QMutex>
00032 #include <QList>
00033 #include <QPair>
00034 #include <QMap>
00035 #include <QSet>
00036
00037
00038 #include "mthreadpool.h"
00039 #include "mythlogging.h"
00040 #include "mythtimer.h"
00041 #include "logging.h"
00042 #include "mthread.h"
00043 #include "mythdb.h"
00044
00045 typedef QPair<QRunnable*,QString> MPoolEntry;
00046 typedef QList<MPoolEntry> MPoolQueue;
00047 typedef QMap<int, MPoolQueue> MPoolQueues;
00048
00049 class MPoolThread : public MThread
00050 {
00051 public:
00052 MPoolThread(MThreadPool &pool, int timeout) :
00053 MThread("PT"), m_pool(pool), m_expiry_timeout(timeout),
00054 m_do_run(true), m_reserved(false)
00055 {
00056 QMutexLocker locker(&s_lock);
00057 setObjectName(QString("PT%1").arg(s_thread_num));
00058 s_thread_num++;
00059 }
00060
00061 void run(void)
00062 {
00063 RunProlog();
00064
00065 MythTimer t;
00066 t.start();
00067 QMutexLocker locker(&m_lock);
00068 while (true)
00069 {
00070 if (m_do_run && !m_runnable)
00071 m_wait.wait(locker.mutex(), m_expiry_timeout+1);
00072
00073 if (!m_runnable)
00074 {
00075 m_do_run = false;
00076
00077 locker.unlock();
00078 m_pool.NotifyDone(this);
00079 locker.relock();
00080 break;
00081 }
00082
00083 if (!m_runnable_name.isEmpty())
00084 loggingRegisterThread(m_runnable_name);
00085
00086 bool autodelete = m_runnable->autoDelete();
00087 m_runnable->run();
00088 if (autodelete)
00089 delete m_runnable;
00090 if (m_reserved)
00091 m_pool.ReleaseThread();
00092 m_reserved = false;
00093 m_runnable = NULL;
00094
00095 loggingDeregisterThread();
00096 loggingRegisterThread(objectName());
00097
00098 GetMythDB()->GetDBManager()->PurgeIdleConnections(false);
00099 QCoreApplication::processEvents();
00100
00101 t.start();
00102
00103 if (m_do_run)
00104 {
00105 locker.unlock();
00106 m_pool.NotifyAvailable(this);
00107 locker.relock();
00108 }
00109 else
00110 {
00111 locker.unlock();
00112 m_pool.NotifyDone(this);
00113 locker.relock();
00114 break;
00115 }
00116 }
00117
00118 RunEpilog();
00119 }
00120
00121 bool SetRunnable(QRunnable *runnable, QString runnableName,
00122 bool reserved)
00123 {
00124 QMutexLocker locker(&m_lock);
00125 if (m_do_run && (m_runnable == NULL))
00126 {
00127 m_runnable = runnable;
00128 m_runnable_name = runnableName;
00129 m_reserved = reserved;
00130 m_wait.wakeAll();
00131 return true;
00132 }
00133 return false;
00134 }
00135
00136 void Shutdown(void)
00137 {
00138 QMutexLocker locker(&m_lock);
00139 m_do_run = false;
00140 m_wait.wakeAll();
00141 }
00142
00143 QMutex m_lock;
00144 QWaitCondition m_wait;
00145 MThreadPool &m_pool;
00146 int m_expiry_timeout;
00147 bool m_do_run;
00148 QString m_runnable_name;
00149 bool m_reserved;
00150
00151 static QMutex s_lock;
00152 static uint s_thread_num;
00153 };
00154 QMutex MPoolThread::s_lock;
00155 uint MPoolThread::s_thread_num = 0;
00156
00158
00159 class MThreadPoolPrivate
00160 {
00161 public:
00162 MThreadPoolPrivate(const QString &name) :
00163 m_name(name),
00164 m_running(true),
00165 m_expiry_timeout(120 * 1000),
00166 m_max_thread_count(QThread::idealThreadCount()),
00167 m_reserve_thread(0)
00168 {
00169 }
00170
00171 int GetRealMaxThread(void)
00172 {
00173 return max(m_max_thread_count,1) + m_reserve_thread;
00174 }
00175
00176 mutable QMutex m_lock;
00177 QString m_name;
00178 QWaitCondition m_wait;
00179 bool m_running;
00180 int m_expiry_timeout;
00181 int m_max_thread_count;
00182 int m_reserve_thread;
00183
00184 MPoolQueues m_run_queues;
00185 QSet<MPoolThread*> m_avail_threads;
00186 QSet<MPoolThread*> m_running_threads;
00187 QList<MPoolThread*> m_delete_threads;
00188
00189 static QMutex s_pool_lock;
00190 static MThreadPool *s_pool;
00191 static QList<MThreadPool*> s_all_pools;
00192 };
00193
00194 QMutex MThreadPoolPrivate::s_pool_lock(QMutex::Recursive);
00195 MThreadPool *MThreadPoolPrivate::s_pool = NULL;
00196 QList<MThreadPool*> MThreadPoolPrivate::s_all_pools;
00197
00199
00200 MThreadPool::MThreadPool(const QString &name) :
00201 m_priv(new MThreadPoolPrivate(name))
00202 {
00203 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
00204 MThreadPoolPrivate::s_all_pools.push_back(this);
00205 }
00206
00207 MThreadPool::~MThreadPool()
00208 {
00209 Stop();
00210 DeletePoolThreads();
00211 {
00212 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
00213 MThreadPoolPrivate::s_all_pools.removeAll(this);
00214 }
00215 delete m_priv;
00216 m_priv = NULL;
00217 }
00218
00219 void MThreadPool::Stop(void)
00220 {
00221 QMutexLocker locker(&m_priv->m_lock);
00222 m_priv->m_running = false;
00223 QSet<MPoolThread*>::iterator it = m_priv->m_avail_threads.begin();
00224 for (; it != m_priv->m_avail_threads.end(); ++it)
00225 (*it)->Shutdown();
00226 it = m_priv->m_running_threads.begin();
00227 for (; it != m_priv->m_running_threads.end(); ++it)
00228 (*it)->Shutdown();
00229 m_priv->m_wait.wakeAll();
00230 }
00231
00232 void MThreadPool::DeletePoolThreads(void)
00233 {
00234 waitForDone();
00235
00236 QMutexLocker locker(&m_priv->m_lock);
00237 QSet<MPoolThread*>::iterator it = m_priv->m_avail_threads.begin();
00238 for (; it != m_priv->m_avail_threads.end(); ++it)
00239 {
00240 m_priv->m_delete_threads.push_front(*it);
00241 }
00242 m_priv->m_avail_threads.clear();
00243
00244 while (!m_priv->m_delete_threads.empty())
00245 {
00246 MPoolThread *thread = m_priv->m_delete_threads.back();
00247 locker.unlock();
00248
00249 thread->wait();
00250
00251 locker.relock();
00252 delete thread;
00253 if (m_priv->m_delete_threads.back() == thread)
00254 m_priv->m_delete_threads.pop_back();
00255 else
00256 m_priv->m_delete_threads.removeAll(thread);
00257 }
00258 }
00259
00260 MThreadPool *MThreadPool::globalInstance(void)
00261 {
00262 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
00263 if (!MThreadPoolPrivate::s_pool)
00264 MThreadPoolPrivate::s_pool = new MThreadPool("GlobalPool");
00265 return MThreadPoolPrivate::s_pool;
00266 }
00267
00268 void MThreadPool::StopAllPools(void)
00269 {
00270 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
00271 QList<MThreadPool*>::iterator it;
00272 for (it = MThreadPoolPrivate::s_all_pools.begin();
00273 it != MThreadPoolPrivate::s_all_pools.end(); ++it)
00274 {
00275 (*it)->Stop();
00276 }
00277 }
00278
00279 void MThreadPool::ShutdownAllPools(void)
00280 {
00281 QMutexLocker locker(&MThreadPoolPrivate::s_pool_lock);
00282 QList<MThreadPool*>::iterator it;
00283 for (it = MThreadPoolPrivate::s_all_pools.begin();
00284 it != MThreadPoolPrivate::s_all_pools.end(); ++it)
00285 {
00286 (*it)->Stop();
00287 }
00288 for (it = MThreadPoolPrivate::s_all_pools.begin();
00289 it != MThreadPoolPrivate::s_all_pools.end(); ++it)
00290 {
00291 (*it)->DeletePoolThreads();
00292 }
00293 }
00294
00295 void MThreadPool::start(QRunnable *runnable, QString debugName, int priority)
00296 {
00297 QMutexLocker locker(&m_priv->m_lock);
00298 if (TryStartInternal(runnable, debugName, false))
00299 return;
00300
00301 MPoolQueues::iterator it = m_priv->m_run_queues.find(priority);
00302 if (it != m_priv->m_run_queues.end())
00303 {
00304 (*it).push_back(MPoolEntry(runnable,debugName));
00305 }
00306 else
00307 {
00308 MPoolQueue list;
00309 list.push_back(MPoolEntry(runnable,debugName));
00310 m_priv->m_run_queues[priority] = list;
00311 }
00312 }
00313
00314 void MThreadPool::startReserved(
00315 QRunnable *runnable, QString debugName, int waitForAvailMS)
00316 {
00317 QMutexLocker locker(&m_priv->m_lock);
00318 if (waitForAvailMS > 0 && m_priv->m_avail_threads.empty() &&
00319 m_priv->m_running_threads.size() >= m_priv->m_max_thread_count)
00320 {
00321 MythTimer t;
00322 t.start();
00323 int left = waitForAvailMS - t.elapsed();
00324 while (left > 0 && m_priv->m_avail_threads.empty() &&
00325 m_priv->m_running_threads.size() >= m_priv->m_max_thread_count)
00326 {
00327 m_priv->m_wait.wait(locker.mutex(), left);
00328 left = waitForAvailMS - t.elapsed();
00329 }
00330 }
00331 TryStartInternal(runnable, debugName, true);
00332 }
00333
00334
00335 bool MThreadPool::tryStart(QRunnable *runnable, QString debugName)
00336 {
00337 QMutexLocker locker(&m_priv->m_lock);
00338 return TryStartInternal(runnable, debugName, false);
00339 }
00340
00341 bool MThreadPool::TryStartInternal(
00342 QRunnable *runnable, QString debugName, bool reserved)
00343 {
00344 if (!m_priv->m_running)
00345 return false;
00346
00347 while (!m_priv->m_delete_threads.empty())
00348 {
00349 m_priv->m_delete_threads.back()->wait();
00350 delete m_priv->m_delete_threads.back();
00351 m_priv->m_delete_threads.pop_back();
00352 }
00353
00354 while (m_priv->m_avail_threads.begin() != m_priv->m_avail_threads.end())
00355 {
00356 MPoolThread *thread = *m_priv->m_avail_threads.begin();
00357 m_priv->m_avail_threads.erase(m_priv->m_avail_threads.begin());
00358 m_priv->m_running_threads.insert(thread);
00359 if (reserved)
00360 m_priv->m_reserve_thread++;
00361 if (thread->SetRunnable(runnable, debugName, reserved))
00362 {
00363 return true;
00364 }
00365 else
00366 {
00367 if (reserved)
00368 m_priv->m_reserve_thread--;
00369 thread->Shutdown();
00370 m_priv->m_running_threads.remove(thread);
00371 m_priv->m_delete_threads.push_front(thread);
00372 }
00373 }
00374
00375 if (reserved ||
00376 m_priv->m_running_threads.size() < m_priv->GetRealMaxThread())
00377 {
00378 if (reserved)
00379 m_priv->m_reserve_thread++;
00380 MPoolThread *thread = new MPoolThread(*this, m_priv->m_expiry_timeout);
00381 m_priv->m_running_threads.insert(thread);
00382 thread->SetRunnable(runnable, debugName, reserved);
00383 thread->start();
00384 return true;
00385 }
00386
00387 return false;
00388 }
00389
00390 void MThreadPool::NotifyAvailable(MPoolThread *thread)
00391 {
00392 QMutexLocker locker(&m_priv->m_lock);
00393
00394 if (!m_priv->m_running)
00395 {
00396 m_priv->m_running_threads.remove(thread);
00397 thread->Shutdown();
00398 m_priv->m_delete_threads.push_front(thread);
00399 m_priv->m_wait.wakeAll();
00400 return;
00401 }
00402
00403 MPoolQueues::iterator it = m_priv->m_run_queues.begin();
00404 if (it == m_priv->m_run_queues.end())
00405 {
00406 m_priv->m_running_threads.remove(thread);
00407 m_priv->m_avail_threads.insert(thread);
00408 m_priv->m_wait.wakeAll();
00409 return;
00410 }
00411
00412 MPoolEntry e = (*it).front();
00413 if (!thread->SetRunnable(e.first, e.second, false))
00414 {
00415 m_priv->m_running_threads.remove(thread);
00416 m_priv->m_wait.wakeAll();
00417 if (!TryStartInternal(e.first, e.second, false))
00418 {
00419 thread->Shutdown();
00420 m_priv->m_delete_threads.push_front(thread);
00421 return;
00422 }
00423 thread->Shutdown();
00424 m_priv->m_delete_threads.push_front(thread);
00425 }
00426
00427 (*it).pop_front();
00428 if ((*it).empty())
00429 m_priv->m_run_queues.erase(it);
00430 }
00431
00432 void MThreadPool::NotifyDone(MPoolThread *thread)
00433 {
00434 QMutexLocker locker(&m_priv->m_lock);
00435 m_priv->m_running_threads.remove(thread);
00436 m_priv->m_avail_threads.remove(thread);
00437 if (!m_priv->m_delete_threads.contains(thread))
00438 m_priv->m_delete_threads.push_front(thread);
00439 m_priv->m_wait.wakeAll();
00440 }
00441
00442 int MThreadPool::expiryTimeout(void) const
00443 {
00444 QMutexLocker locker(&m_priv->m_lock);
00445 return m_priv->m_expiry_timeout;
00446 }
00447
00448 void MThreadPool::setExpiryTimeout(int expiryTimeout)
00449 {
00450 QMutexLocker locker(&m_priv->m_lock);
00451 m_priv->m_expiry_timeout = expiryTimeout;
00452 }
00453
00454 int MThreadPool::maxThreadCount(void) const
00455 {
00456 QMutexLocker locker(&m_priv->m_lock);
00457 return m_priv->m_max_thread_count;
00458 }
00459
00460 void MThreadPool::setMaxThreadCount(int maxThreadCount)
00461 {
00462 QMutexLocker locker(&m_priv->m_lock);
00463 m_priv->m_max_thread_count = maxThreadCount;
00464 }
00465
00466 int MThreadPool::activeThreadCount(void) const
00467 {
00468 QMutexLocker locker(&m_priv->m_lock);
00469 return m_priv->m_avail_threads.size() + m_priv->m_running_threads.size();
00470 }
00471
00472
00473
00474
00475
00476
00477
00478
00479
00480
00481
00482
00483
00484
00485
00486
00487 void MThreadPool::ReleaseThread(void)
00488 {
00489 QMutexLocker locker(&m_priv->m_lock);
00490 if (m_priv->m_reserve_thread > 0)
00491 m_priv->m_reserve_thread--;
00492 }
00493
00494 #if 0
00495 static void print_set(QString title, QSet<MPoolThread*> set)
00496 {
00497 LOG(VB_GENERAL, LOG_INFO, title);
00498 QSet<MPoolThread*>::iterator it = set.begin();
00499 for (; it != set.end(); ++it)
00500 {
00501 LOG(VB_GENERAL, LOG_INFO, QString(" : 0x%1")
00502 .arg((quint64)(*it),0,16));
00503 }
00504 LOG(VB_GENERAL, LOG_INFO, "");
00505 }
00506 #endif
00507
00508 void MThreadPool::waitForDone(void)
00509 {
00510 QMutexLocker locker(&m_priv->m_lock);
00511 while (true)
00512 {
00513 while (!m_priv->m_delete_threads.empty())
00514 {
00515 m_priv->m_delete_threads.back()->wait();
00516 delete m_priv->m_delete_threads.back();
00517 m_priv->m_delete_threads.pop_back();
00518 }
00519
00520 if (m_priv->m_running && !m_priv->m_run_queues.empty())
00521 {
00522 m_priv->m_wait.wait(locker.mutex());
00523 continue;
00524 }
00525
00526 QSet<MPoolThread*> working = m_priv->m_running_threads;
00527 working = working.subtract(m_priv->m_avail_threads);
00528 if (working.empty())
00529 break;
00530 m_priv->m_wait.wait(locker.mutex());
00531 }
00532 }
00533
00534