00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016 extern "C" {
00017 #include <signal.h>
00018 }
00019
00020 #include <qevent.h>
00021 #include <qapplication.h>
00022
00023 #include "weaver.h"
00024
00025 namespace KPIM {
00026 namespace ThreadWeaver {
00027
00028 bool Debug = true;
00029 int DebugLevel = 2;
00030
00031 Job::Job (QObject* parent, const char* name)
00032 : QObject (parent, name),
00033 m_finished (false),
00034 m_mutex (new QMutex (true) ),
00035 m_thread (0)
00036 {
00037 }
00038
00039 Job::~Job()
00040 {
00041 }
00042
00043 void Job::lock()
00044 {
00045 m_mutex->lock();
00046 }
00047
00048 void Job::unlock()
00049 {
00050 m_mutex->unlock();
00051 }
00052
00053 void Job::execute(Thread *th)
00054 {
00055 m_mutex->lock();
00056 m_thread = th;
00057 m_mutex->unlock();
00058
00059 run ();
00060
00061 m_mutex->lock();
00062 setFinished (true);
00063 m_thread = 0;
00064 m_mutex->unlock();
00065 }
00066
00067 Thread *Job::thread ()
00068 {
00069 QMutexLocker l (m_mutex);
00070 return m_thread;
00071 }
00072
00073 bool Job::isFinished() const
00074 {
00075 QMutexLocker l (m_mutex);
00076 return m_finished;
00077 }
00078
00079 void Job::setFinished(bool status)
00080 {
00081 QMutexLocker l (m_mutex);
00082 m_finished = status;
00083 }
00084
00085 void Job::processEvent (Event *e)
00086 {
00087 switch ( e->action() )
00088 {
00089 case Event::JobStarted:
00090 emit ( started() );
00091 break;
00092 case Event::JobFinished:
00093 emit ( done() );
00094 break;
00095 case Event::JobSPR:
00096 emit ( SPR () );
00097 m_wc->wakeOne ();
00098 break;
00099 case Event::JobAPR:
00100 emit ( APR () );
00101
00102 break;
00103 default:
00104 break;
00105 }
00106 }
00107
00108 void Job::triggerSPR ()
00109 {
00110 m_mutex->lock ();
00111 m_wc = new QWaitCondition;
00112 m_mutex->unlock ();
00113
00114 thread()->post (KPIM::ThreadWeaver::Event::JobSPR, this);
00115 m_wc->wait ();
00116
00117 m_mutex->lock ();
00118 delete m_wc;
00119 m_wc = 0;
00120 m_mutex->unlock ();
00121 }
00122
00123 void Job::triggerAPR ()
00124 {
00125 m_mutex->lock ();
00126 m_wc = new QWaitCondition;
00127 m_mutex->unlock ();
00128
00129 thread()->post (KPIM::ThreadWeaver::Event::JobAPR, this);
00130 m_wc->wait ();
00131 }
00132
00133 void Job::wakeAPR ()
00134 {
00135 QMutexLocker l(m_mutex);
00136 if ( m_wc!=0 )
00137 {
00138 m_wc->wakeOne ();
00139 delete m_wc;
00140 m_wc = 0;
00141 }
00142 }
00143
00144 const int Event::Type = QEvent::User + 1000;
00145
00146 Event::Event ( Action action, Thread *thread, Job *job)
00147 : QCustomEvent ( type () ),
00148 m_action (action),
00149 m_thread (thread),
00150 m_job (job)
00151 {
00152 }
00153
00154 const int Event::type ()
00155 {
00156 return Type;
00157 }
00158
00159 Thread* Event::thread () const
00160 {
00161 if ( m_thread != 0)
00162 {
00163 return m_thread;
00164 } else {
00165 return 0;
00166 }
00167 }
00168
00169 Job* Event::job () const
00170 {
00171 return m_job;
00172 }
00173
00174 Event::Action Event::action () const
00175 {
00176 return m_action;
00177 }
00178
00179 unsigned int Thread::sm_Id;
00180
00181 Thread::Thread (Weaver *parent)
00182 : QThread (),
00183 m_parent ( parent ),
00184 m_id ( makeId() )
00185 {
00186 }
00187
00188 Thread::~Thread()
00189 {
00190 }
00191
00192 unsigned int Thread::makeId()
00193 {
00194 static QMutex mutex;
00195 QMutexLocker l (&mutex);
00196
00197 return ++sm_Id;
00198 }
00199
00200 const unsigned int Thread::id() const
00201 {
00202 return m_id;
00203 }
00204
00205 void Thread::run()
00206 {
00207 Job *job = 0;
00208
00209 post ( Event::ThreadStarted );
00210
00211 while (true)
00212 {
00213 debug ( 3, "Thread::run [%u]: trying to execute the next job.\n", id() );
00214
00215 job = m_parent->applyForWork ( this, job );
00216
00217 if (job == 0)
00218 {
00219 break;
00220 } else {
00221 post ( Event::JobStarted, job );
00222 job->execute (this);
00223 post ( Event::JobFinished, job );
00224 }
00225 }
00226
00227 post ( Event::ThreadExiting );
00228 }
00229
00230 void Thread::post (Event::Action a, Job *j)
00231 {
00232 m_parent->post ( a, this, j);
00233 }
00234
00235 void Thread::msleep(unsigned long msec)
00236 {
00237 QThread::msleep(msec);
00238 }
00239
00240 Weaver::Weaver(QObject* parent, const char* name,
00241 int inventoryMin, int inventoryMax)
00242 : QObject(parent, name),
00243 m_active(0),
00244 m_inventoryMin(inventoryMin),
00245 m_inventoryMax(inventoryMax),
00246 m_shuttingDown(false),
00247 m_running (false),
00248 m_suspend (false),
00249 m_mutex ( new QMutex(true) )
00250 {
00251 lock();
00252
00253 for ( int count = 0; count < m_inventoryMin; ++count)
00254 {
00255 Thread *th = new Thread(this);
00256 m_inventory.append(th);
00257
00258 th->start();
00259
00260 emit (threadCreated (th) );
00261 }
00262
00263 unlock();
00264 }
00265
00266 Weaver::~Weaver()
00267 {
00268 lock();
00269
00270 debug ( 1, "Weaver dtor: destroying inventory.\n" );
00271
00272 m_shuttingDown = true;
00273
00274 unlock();
00275
00276 m_jobAvailable.wakeAll();
00277
00278
00279
00280
00281
00282
00283
00284
00285 for ( Thread *th = m_inventory.first(); th; th = m_inventory.next() )
00286 {
00287 if ( !th->finished() )
00288 {
00289 m_jobAvailable.wakeAll();
00290 th->wait();
00291 }
00292
00293 emit (threadDestroyed (th) );
00294 delete th;
00295
00296 }
00297
00298 m_inventory.clear();
00299
00300 delete m_mutex;
00301
00302 debug ( 1, "Weaver dtor: done\n" );
00303
00304 }
00305
00306 void Weaver::lock()
00307 {
00308 debug ( 3 , "Weaver::lock: lock (mutex is %s).\n",
00309 ( m_mutex->locked() ? "locked" : "not locked" ) );
00310 m_mutex->lock();
00311 }
00312
00313 void Weaver::unlock()
00314 {
00315 m_mutex->unlock();
00316
00317 debug ( 3 , "Weaver::unlock: unlock (mutex is %s).\n",
00318 ( m_mutex->locked() ? "locked" : "not locked" ) );
00319 }
00320
00321 int Weaver::threads () const
00322 {
00323 QMutexLocker l (m_mutex);
00324 return m_inventory.count ();
00325 }
00326
00327 void Weaver::enqueue(Job* job)
00328 {
00329 lock();
00330
00331 m_assignments.append(job);
00332 m_running = true;
00333
00334 unlock();
00335
00336 assignJobs();
00337 }
00338
00339 void Weaver::enqueue (QPtrList <Job> jobs)
00340 {
00341 lock();
00342
00343 for ( Job * job = jobs.first(); job; job = jobs.next() )
00344 {
00345 m_assignments.append (job);
00346 }
00347
00348 unlock();
00349
00350 assignJobs();
00351 }
00352
00353 bool Weaver::dequeue ( Job* job )
00354 {
00355 QMutexLocker l (m_mutex);
00356 return m_assignments.remove (job);
00357 }
00358
00359 void Weaver::dequeue ()
00360 {
00361 QMutexLocker l (m_mutex);
00362 m_assignments.clear();
00363 }
00364
00365 void Weaver::suspend (bool state)
00366 {
00367 lock();
00368
00369 if (state)
00370 {
00371
00372 m_suspend = true;
00373 if ( m_active == 0 && isEmpty() )
00374 {
00375 post (Event::Suspended);
00376 }
00377 } else {
00378 m_suspend = false;
00379
00380 assignJobs ();
00381 debug (2, "Weaver::suspend: queueing resumed.\n" );
00382 }
00383
00384 unlock();
00385 }
00386
00387 void Weaver::assignJobs()
00388 {
00389 m_jobAvailable.wakeAll();
00390 }
00391
00392 bool Weaver::event (QEvent *e )
00393 {
00394 if ( e->type() >= QEvent::User )
00395 {
00396
00397 if ( e->type() == Event::type() )
00398 {
00399 Event *event = (Event*) e;
00400
00401 switch (event->action() )
00402 {
00403 case Event::JobFinished:
00404 if ( event->job() !=0 )
00405 {
00406 emit (jobDone (event->job() ) );
00407 }
00408 break;
00409 case Event::Finished:
00410 emit ( finished() );
00411 break;
00412 case Event::Suspended:
00413 emit ( suspended() );
00414 break;
00415 case Event::ThreadSuspended:
00416 if (!m_shuttingDown )
00417 {
00418 emit (threadSuspended ( event->thread() ) );
00419 }
00420 break;
00421 case Event::ThreadBusy:
00422 if (!m_shuttingDown )
00423 {
00424 emit (threadBusy (event->thread() ) );
00425 }
00426 break;
00427 default:
00428 break;
00429 }
00430
00431 if ( event->job() !=0 )
00432 {
00433 event->job()->processEvent (event);
00434 }
00435 } else {
00436 debug ( 0, "Weaver::event: Strange: received unknown user event.\n" );
00437 }
00438 return true;
00439 } else {
00440
00441 return QObject::event ( e );
00442 }
00443 }
00444
00445 void Weaver::post (Event::Action a, Thread* t, Job* j)
00446 {
00447 Event *e = new Event ( a, t, j);
00448 QApplication::postEvent (this, e);
00449 }
00450
00451 bool Weaver::isEmpty() const
00452 {
00453 QMutexLocker l (m_mutex);
00454 return m_assignments.count()==0;
00455 }
00456
00457 Job* Weaver::applyForWork(Thread *th, Job* previous)
00458 {
00459 Job *rc = 0;
00460 bool lastjob = false;
00461 bool suspended = false;
00462
00463 while (true)
00464 {
00465 lock();
00466
00467 if (previous != 0)
00468 {
00469 --m_active;
00470
00471 debug ( 3, "Weaver::applyForWork: job done, %i jobs left, "
00472 "%i active jobs left.\n",
00473 queueLength(), m_active );
00474
00475 if ( m_active == 0 && isEmpty() )
00476 {
00477 lastjob = true;
00478 m_running = false;
00479 post (Event::Finished);
00480 debug ( 3, "Weaver::applyForWork: last job.\n" );
00481 }
00482
00483 if (m_active == 0 && m_suspend == true)
00484 {
00485 suspended = true;
00486 post (Event::Suspended);
00487 debug ( 2, "Weaver::applyForWork: queueing suspended.\n" );
00488 }
00489
00490 m_jobFinished.wakeOne();
00491 }
00492
00493 previous = 0;
00494
00495 if (m_shuttingDown == true)
00496 {
00497 unlock();
00498
00499 return 0;
00500 } else {
00501 if ( !isEmpty() && m_suspend == false )
00502 {
00503 rc = m_assignments.getFirst();
00504 m_assignments.removeFirst ();
00505 ++m_active;
00506
00507 debug ( 3, "Weaver::applyForWork: job assigned, "
00508 "%i jobs in queue (%i active).\n",
00509 m_assignments.count(), m_active );
00510 unlock();
00511
00512 post (Event::ThreadBusy, th);
00513
00514 return rc;
00515 } else {
00516 unlock();
00517
00518 post (Event::ThreadSuspended, th);
00519 m_jobAvailable.wait();
00520 }
00521 }
00522 }
00523 }
00524
00525 int Weaver::queueLength()
00526 {
00527 QMutexLocker l (m_mutex);
00528 return m_assignments.count();
00529 }
00530
00531 bool Weaver::isIdle () const
00532 {
00533 QMutexLocker l (m_mutex);
00534 return isEmpty() && m_active == 0;
00535 }
00536
00537 void Weaver::finish()
00538 {
00539 while ( !isIdle() )
00540 {
00541 debug (2, "Weaver::finish: not done, waiting.\n" );
00542 m_jobFinished.wait();
00543 }
00544 debug (1, "Weaver::finish: done.\n\n\n" );
00545 }
00546
00547 }
00548 }
00549
00550 #include "weaver.moc"