00001
00002 #include <unistd.h>
00003 #include <sys/types.h>
00004 #include <sys/stat.h>
00005 #include <iostream>
00006 #include <cstdlib>
00007 #include <fcntl.h>
00008 #include <pthread.h>
00009 using namespace std;
00010
00011 #include <QDateTime>
00012 #include <QFileInfo>
00013 #include <QRegExp>
00014 #include <QEvent>
00015 #include <QCoreApplication>
00016
00017 #include "mythconfig.h"
00018
00019 #include "exitcodes.h"
00020 #include "jobqueue.h"
00021 #include "programinfo.h"
00022 #include "mythcorecontext.h"
00023 #include "mythmiscutil.h"
00024 #include "previewgenerator.h"
00025 #include "compat.h"
00026 #include "recordingprofile.h"
00027 #include "recordinginfo.h"
00028 #include "mthread.h"
00029
00030 #include "mythdb.h"
00031 #include "mythdirs.h"
00032 #include "mythlogging.h"
00033
00034 #ifndef O_STREAMING
00035 #define O_STREAMING 0
00036 #endif
00037
00038 #ifndef O_LARGEFILE
00039 #define O_LARGEFILE 0
00040 #endif
00041
00042 #define LOC QString("JobQueue: ")
00043
00044 JobQueue::JobQueue(bool master) :
00045 m_hostname(gCoreContext->GetHostName()),
00046 jobsRunning(0),
00047 jobQueueCPU(0),
00048 m_pginfo(NULL),
00049 runningJobsLock(new QMutex(QMutex::Recursive)),
00050 isMaster(master),
00051 queueThread(new MThread("JobQueue", this)),
00052 processQueue(false)
00053 {
00054 jobQueueCPU = gCoreContext->GetNumSetting("JobQueueCPU", 0);
00055
00056 #ifndef USING_VALGRIND
00057 QMutexLocker locker(&queueThreadCondLock);
00058 processQueue = true;
00059 queueThread->start();
00060 #else
00061 LOG(VB_GENERAL, LOG_ERR, LOC +
00062 "The JobQueue has been disabled because "
00063 "you compiled with the --enable-valgrind option.");
00064 #endif // USING_VALGRIND
00065
00066 gCoreContext->addListener(this);
00067 }
00068
00069 JobQueue::~JobQueue(void)
00070 {
00071 queueThreadCondLock.lock();
00072 processQueue = false;
00073 queueThreadCond.wakeAll();
00074 queueThreadCondLock.unlock();
00075
00076 queueThread->wait();
00077 delete queueThread;
00078 queueThread = NULL;
00079
00080 gCoreContext->removeListener(this);
00081
00082 delete runningJobsLock;
00083 }
00084
00085 void JobQueue::customEvent(QEvent *e)
00086 {
00087 if ((MythEvent::Type)(e->type()) == MythEvent::MythEventMessage)
00088 {
00089 MythEvent *me = (MythEvent *)e;
00090 QString message = me->Message();
00091
00092 if (message.left(9) == "LOCAL_JOB")
00093 {
00094
00095
00096 QString msg;
00097 message = message.simplified();
00098 QStringList tokens = message.split(" ", QString::SkipEmptyParts);
00099 QString action = tokens[1];
00100 int jobID = -1;
00101
00102 if (tokens[2] == "ID")
00103 jobID = tokens[3].toInt();
00104 else
00105 {
00106 jobID = GetJobID(
00107 tokens[2].toInt(),
00108 tokens[3].toUInt(),
00109 QDateTime::fromString(tokens[4], Qt::ISODate));
00110 }
00111
00112 runningJobsLock->lock();
00113 if (!runningJobs.contains(jobID))
00114 {
00115 msg = QString("Unable to determine jobID for message: "
00116 "%1. Program will not be flagged.")
00117 .arg(message);
00118 LOG(VB_GENERAL, LOG_ERR, LOC + msg);
00119 runningJobsLock->unlock();
00120 return;
00121 }
00122 runningJobsLock->unlock();
00123
00124 msg = QString("Received message '%1'").arg(message);
00125 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
00126
00127 if ((action == "STOP") ||
00128 (action == "PAUSE") ||
00129 (action == "RESTART") ||
00130 (action == "RESUME" ))
00131 {
00132 runningJobsLock->lock();
00133
00134 if (action == "STOP")
00135 runningJobs[jobID].flag = JOB_STOP;
00136 else if (action == "PAUSE")
00137 runningJobs[jobID].flag = JOB_PAUSE;
00138 else if (action == "RESUME")
00139 runningJobs[jobID].flag = JOB_RUN;
00140 else if (action == "RESTART")
00141 runningJobs[jobID].flag = JOB_RESTART;
00142
00143 runningJobsLock->unlock();
00144 }
00145 }
00146 }
00147 }
00148
00149 void JobQueue::run(void)
00150 {
00151 queueThreadCondLock.lock();
00152 queueThreadCond.wakeAll();
00153 queueThreadCondLock.unlock();
00154
00155 RecoverQueue();
00156
00157 queueThreadCondLock.lock();
00158 queueThreadCond.wait(&queueThreadCondLock, 10 * 1000);
00159 queueThreadCondLock.unlock();
00160
00161 ProcessQueue();
00162 }
00163
00164 void JobQueue::ProcessQueue(void)
00165 {
00166 LOG(VB_JOBQUEUE, LOG_INFO, LOC + "ProcessQueue() started");
00167
00168 QString logInfo;
00169 int jobID;
00170 int cmds;
00171 int flags;
00172 int status;
00173 QString hostname;
00174 int sleepTime;
00175
00176 QMap<int, int> jobStatus;
00177 int maxJobs;
00178 QString message;
00179 QMap<int, JobQueueEntry> jobs;
00180 bool atMax = false;
00181 bool inTimeWindow = true;
00182 bool startedJobAlready = false;
00183 QMap<int, RunningJobInfo>::Iterator rjiter;
00184
00185 QMutexLocker locker(&queueThreadCondLock);
00186 while (processQueue)
00187 {
00188 locker.unlock();
00189
00190 startedJobAlready = false;
00191 sleepTime = gCoreContext->GetNumSetting("JobQueueCheckFrequency", 30);
00192 maxJobs = gCoreContext->GetNumSetting("JobQueueMaxSimultaneousJobs", 3);
00193 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
00194 QString("Currently set to run up to %1 job(s) max.")
00195 .arg(maxJobs));
00196
00197 jobStatus.clear();
00198
00199 runningJobsLock->lock();
00200 for (rjiter = runningJobs.begin(); rjiter != runningJobs.end();
00201 ++rjiter)
00202 {
00203 if ((*rjiter).pginfo)
00204 (*rjiter).pginfo->UpdateInUseMark();
00205 }
00206 runningJobsLock->unlock();
00207
00208 jobsRunning = 0;
00209 GetJobsInQueue(jobs);
00210
00211 if (jobs.size())
00212 {
00213 inTimeWindow = InJobRunWindow();
00214 for (int x = 0; x < jobs.size(); x++)
00215 {
00216 status = jobs[x].status;
00217 hostname = jobs[x].hostname;
00218
00219 if (((status == JOB_RUNNING) ||
00220 (status == JOB_STARTING) ||
00221 (status == JOB_PAUSED)) &&
00222 (hostname == m_hostname))
00223 jobsRunning++;
00224 }
00225
00226 message = QString("Currently Running %1 jobs.")
00227 .arg(jobsRunning);
00228 if (!inTimeWindow)
00229 {
00230 message += QString(" Jobs in Queue, but we are outside of the "
00231 "Job Queue time window, no new jobs can be "
00232 "started.");
00233 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00234 }
00235 else if (jobsRunning >= maxJobs)
00236 {
00237 message += " (At Maximum, no new jobs can be started until "
00238 "a running job completes)";
00239
00240 if (!atMax)
00241 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00242
00243 atMax = true;
00244 }
00245 else
00246 {
00247 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00248 atMax = false;
00249 }
00250
00251
00252 for ( int x = 0;
00253 (x < jobs.size()) && (jobsRunning < maxJobs); x++)
00254 {
00255 jobID = jobs[x].id;
00256 cmds = jobs[x].cmds;
00257 flags = jobs[x].flags;
00258 status = jobs[x].status;
00259 hostname = jobs[x].hostname;
00260
00261 if (!jobs[x].chanid)
00262 logInfo = QString("jobID #%1").arg(jobID);
00263 else
00264 logInfo = QString("chanid %1 @ %2").arg(jobs[x].chanid)
00265 .arg(jobs[x].startts);
00266
00267
00268 if ((inTimeWindow) &&
00269 (!hostname.isEmpty()) &&
00270 (hostname != m_hostname))
00271 {
00272
00273
00274
00275 jobStatus[jobID] = status;
00276
00277 message = QString("Skipping '%1' job for %2, "
00278 "should run on '%3' instead")
00279 .arg(JobText(jobs[x].type)).arg(logInfo)
00280 .arg(hostname);
00281 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00282 continue;
00283 }
00284
00285
00286 if (inTimeWindow)
00287 {
00288 int otherJobID = GetRunningJobID(jobs[x].chanid,
00289 jobs[x].recstartts);
00290 if (otherJobID && (jobStatus.contains(otherJobID)) &&
00291 (!(jobStatus[otherJobID] & JOB_DONE)))
00292 {
00293 message =
00294 QString("Skipping '%1' job for %2, "
00295 "Job ID %3 is already running for "
00296 "this recording with a status of '%4'")
00297 .arg(JobText(jobs[x].type)).arg(logInfo)
00298 .arg(otherJobID)
00299 .arg(StatusText(jobStatus[otherJobID]));
00300 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00301 continue;
00302 }
00303 }
00304
00305 jobStatus[jobID] = status;
00306
00307
00308 if ((inTimeWindow) && (!AllowedToRun(jobs[x])))
00309 {
00310 message = QString("Skipping '%1' job for %2, "
00311 "not allowed to run on this backend.")
00312 .arg(JobText(jobs[x].type)).arg(logInfo);
00313 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00314 continue;
00315 }
00316
00317
00318 if (jobs[x].schedruntime > QDateTime::currentDateTime())
00319 {
00320 message = QString("Skipping '%1' job for %2, this job is "
00321 "not scheduled to run until %3.")
00322 .arg(JobText(jobs[x].type)).arg(logInfo)
00323 .arg(jobs[x].schedruntime
00324 .toString(Qt::ISODate));
00325 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00326 continue;
00327 }
00328
00329 if (cmds & JOB_STOP)
00330 {
00331
00332
00333 if (status != JOB_QUEUED) {
00334 message = QString("Stopping '%1' job for %2")
00335 .arg(JobText(jobs[x].type))
00336 .arg(logInfo);
00337 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00338
00339 runningJobsLock->lock();
00340 if (runningJobs.contains(jobID))
00341 runningJobs[jobID].flag = JOB_STOP;
00342 runningJobsLock->unlock();
00343
00344
00345 continue;
00346
00347
00348
00349
00350 } else {
00351 message = QString("Cancelling '%1' job for %2")
00352 .arg(JobText(jobs[x].type))
00353 .arg(logInfo);
00354 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00355
00356
00357
00358
00359
00360 if (!ChangeJobHost(jobID, m_hostname))
00361 {
00362 message = QString("Unable to claim '%1' job for %2")
00363 .arg(JobText(jobs[x].type))
00364 .arg(logInfo);
00365 LOG(VB_JOBQUEUE, LOG_ERR, LOC + message);
00366 continue;
00367 }
00368
00369 ChangeJobStatus(jobID, JOB_CANCELLED, "");
00370 ChangeJobCmds(jobID, JOB_RUN);
00371 continue;
00372 }
00373 }
00374
00375 if ((cmds & JOB_PAUSE) && (status != JOB_QUEUED))
00376 {
00377 message = QString("Pausing '%1' job for %2")
00378 .arg(JobText(jobs[x].type)).arg(logInfo);
00379 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00380
00381 runningJobsLock->lock();
00382 if (runningJobs.contains(jobID))
00383 runningJobs[jobID].flag = JOB_PAUSE;
00384 runningJobsLock->unlock();
00385
00386 ChangeJobCmds(jobID, JOB_RUN);
00387 continue;
00388 }
00389
00390 if ((cmds & JOB_RESTART) && (status != JOB_QUEUED))
00391 {
00392 message = QString("Restart '%1' job for %2")
00393 .arg(JobText(jobs[x].type)).arg(logInfo);
00394 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00395
00396 runningJobsLock->lock();
00397 if (runningJobs.contains(jobID))
00398 runningJobs[jobID].flag = JOB_RUN;
00399 runningJobsLock->unlock();
00400
00401 ChangeJobCmds(jobID, JOB_RUN);
00402 continue;
00403 }
00404
00405 if (status != JOB_QUEUED)
00406 {
00407
00408 if (hostname.isEmpty())
00409 {
00410 message = QString("Resetting '%1' job for %2 to %3 "
00411 "status, because no hostname is set.")
00412 .arg(JobText(jobs[x].type))
00413 .arg(logInfo)
00414 .arg(StatusText(JOB_QUEUED));
00415 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00416
00417 ChangeJobStatus(jobID, JOB_QUEUED, "");
00418 ChangeJobCmds(jobID, JOB_RUN);
00419 }
00420 else if (inTimeWindow)
00421 {
00422 message = QString("Skipping '%1' job for %2, "
00423 "current job status is '%3'")
00424 .arg(JobText(jobs[x].type))
00425 .arg(logInfo)
00426 .arg(StatusText(status));
00427 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00428 }
00429 continue;
00430 }
00431
00432
00433 if (startedJobAlready)
00434 continue;
00435
00436 if ((inTimeWindow) &&
00437 (hostname.isEmpty()) &&
00438 (!ChangeJobHost(jobID, m_hostname)))
00439 {
00440 message = QString("Unable to claim '%1' job for %2")
00441 .arg(JobText(jobs[x].type)).arg(logInfo);
00442 LOG(VB_JOBQUEUE, LOG_ERR, LOC + message);
00443 continue;
00444 }
00445
00446 if (!inTimeWindow)
00447 {
00448 message = QString("Skipping '%1' job for %2, "
00449 "current time is outside of the "
00450 "Job Queue processing window.")
00451 .arg(JobText(jobs[x].type)).arg(logInfo);
00452 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00453 continue;
00454 }
00455
00456 message = QString("Processing '%1' job for %2, "
00457 "current status is '%3'")
00458 .arg(JobText(jobs[x].type)).arg(logInfo)
00459 .arg(StatusText(status));
00460 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00461
00462 ProcessJob(jobs[x]);
00463
00464 startedJobAlready = true;
00465 }
00466 }
00467
00468 if (QCoreApplication::applicationName() == MYTH_APPNAME_MYTHJOBQUEUE)
00469 {
00470 if (jobsRunning > 0)
00471 {
00472 if (!(gCoreContext->IsBlockingClient()))
00473 {
00474 gCoreContext->BlockShutdown();
00475 LOG(VB_JOBQUEUE, LOG_INFO, QString("%1 jobs running. "
00476 "Blocking shutdown.").arg(jobsRunning));
00477 }
00478 }
00479 else
00480 {
00481 if (gCoreContext->IsBlockingClient())
00482 {
00483 gCoreContext->AllowShutdown();
00484 LOG(VB_JOBQUEUE, LOG_INFO, "No jobs running. "
00485 "Allowing shutdown.");
00486 }
00487 }
00488 }
00489
00490
00491 locker.relock();
00492 if (processQueue)
00493 {
00494 int st = (startedJobAlready) ? (5 * 1000) : (sleepTime * 1000);
00495 if (st > 0)
00496 queueThreadCond.wait(locker.mutex(), st);
00497 }
00498 }
00499 }
00500
00501 bool JobQueue::QueueRecordingJobs(const RecordingInfo &recinfo, int jobTypes)
00502 {
00503 if (jobTypes == JOB_NONE)
00504 jobTypes = recinfo.GetAutoRunJobs();
00505
00506 if (recinfo.IsCommercialFree())
00507 jobTypes &= (~JOB_COMMFLAG);
00508
00509 if (jobTypes != JOB_NONE)
00510 {
00511 QString jobHost = QString("");
00512
00513 if (gCoreContext->GetNumSetting("JobsRunOnRecordHost", 0))
00514 jobHost = recinfo.GetHostname();
00515
00516 return JobQueue::QueueJobs(
00517 jobTypes, recinfo.GetChanID(), recinfo.GetRecordingStartTime(),
00518 "", "", jobHost);
00519 }
00520 else
00521 return false;
00522
00523 return true;
00524 }
00525
00526 bool JobQueue::QueueJob(int jobType, uint chanid, const QDateTime &recstartts,
00527 QString args, QString comment, QString host,
00528 int flags, int status, QDateTime schedruntime)
00529 {
00530 int tmpStatus = JOB_UNKNOWN;
00531 int tmpCmd = JOB_UNKNOWN;
00532 int jobID = -1;
00533 int chanidInt = -1;
00534
00535 if(!schedruntime.isValid())
00536 schedruntime = QDateTime::currentDateTime();
00537
00538 MSqlQuery query(MSqlQuery::InitCon());
00539
00540
00541 if (chanid)
00542 {
00543 query.prepare("SELECT status, id, cmds FROM jobqueue "
00544 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00545 "AND type = :JOBTYPE;");
00546 query.bindValue(":CHANID", chanid);
00547 query.bindValue(":STARTTIME", recstartts);
00548 query.bindValue(":JOBTYPE", jobType);
00549
00550 if (!query.exec())
00551 {
00552 MythDB::DBError("Error in JobQueue::QueueJob()", query);
00553 return false;
00554 }
00555 else
00556 {
00557 if (query.next())
00558 {
00559 tmpStatus = query.value(0).toInt();
00560 jobID = query.value(1).toInt();
00561 tmpCmd = query.value(2).toInt();
00562 }
00563 }
00564 switch (tmpStatus)
00565 {
00566 case JOB_UNKNOWN:
00567 break;
00568 case JOB_STARTING:
00569 case JOB_RUNNING:
00570 case JOB_PAUSED:
00571 case JOB_STOPPING:
00572 case JOB_ERRORING:
00573 case JOB_ABORTING:
00574 return false;
00575 default:
00576 DeleteJob(jobID);
00577 break;
00578 }
00579 if (! (tmpStatus & JOB_DONE) && (tmpCmd & JOB_STOP))
00580 return false;
00581
00582 chanidInt = chanid;
00583 }
00584
00585 if (host.isNull())
00586 host = QString("");
00587
00588 query.prepare("INSERT INTO jobqueue (chanid, starttime, inserttime, type, "
00589 "status, statustime, schedruntime, hostname, args, comment, "
00590 "flags) "
00591 "VALUES (:CHANID, :STARTTIME, now(), :JOBTYPE, :STATUS, "
00592 "now(), :SCHEDRUNTIME, :HOST, :ARGS, :COMMENT, :FLAGS);");
00593
00594 query.bindValue(":CHANID", chanidInt);
00595 query.bindValue(":STARTTIME", recstartts);
00596 query.bindValue(":JOBTYPE", jobType);
00597 query.bindValue(":STATUS", status);
00598 query.bindValue(":SCHEDRUNTIME", schedruntime);
00599 query.bindValue(":HOST", host);
00600 query.bindValue(":ARGS", args);
00601 query.bindValue(":COMMENT", comment);
00602 query.bindValue(":FLAGS", flags);
00603
00604 if (!query.exec())
00605 {
00606 MythDB::DBError("Error in JobQueue::StartJob()", query);
00607 return false;
00608 }
00609
00610 return true;
00611 }
00612
00613 bool JobQueue::QueueJobs(int jobTypes, uint chanid, const QDateTime &recstartts,
00614 QString args, QString comment, QString host)
00615 {
00616 if (gCoreContext->GetNumSetting("AutoTranscodeBeforeAutoCommflag", 0))
00617 {
00618 if (jobTypes & JOB_METADATA)
00619 QueueJob(JOB_METADATA, chanid, recstartts, args, comment, host);
00620 if (jobTypes & JOB_TRANSCODE)
00621 QueueJob(JOB_TRANSCODE, chanid, recstartts, args, comment, host);
00622 if (jobTypes & JOB_COMMFLAG)
00623 QueueJob(JOB_COMMFLAG, chanid, recstartts, args, comment, host);
00624 }
00625 else
00626 {
00627 if (jobTypes & JOB_METADATA)
00628 QueueJob(JOB_METADATA, chanid, recstartts, args, comment, host);
00629 if (jobTypes & JOB_COMMFLAG)
00630 QueueJob(JOB_COMMFLAG, chanid, recstartts, args, comment, host);
00631 if (jobTypes & JOB_TRANSCODE)
00632 {
00633 QDateTime schedruntime = QDateTime::currentDateTime();
00634
00635 int defer = gCoreContext->GetNumSetting("DeferAutoTranscodeDays", 0);
00636 if (defer)
00637 {
00638 schedruntime = schedruntime.addDays(defer);
00639 schedruntime.setTime(QTime(0,0));
00640 }
00641
00642 QueueJob(JOB_TRANSCODE, chanid, recstartts, args, comment, host,
00643 0, JOB_QUEUED, schedruntime);
00644 }
00645 }
00646
00647 if (jobTypes & JOB_USERJOB1)
00648 QueueJob(JOB_USERJOB1, chanid, recstartts, args, comment, host);
00649 if (jobTypes & JOB_USERJOB2)
00650 QueueJob(JOB_USERJOB2, chanid, recstartts, args, comment, host);
00651 if (jobTypes & JOB_USERJOB3)
00652 QueueJob(JOB_USERJOB3, chanid, recstartts, args, comment, host);
00653 if (jobTypes & JOB_USERJOB4)
00654 QueueJob(JOB_USERJOB4, chanid, recstartts, args, comment, host);
00655
00656 return true;
00657 }
00658
00659 int JobQueue::GetJobID(int jobType, uint chanid, const QDateTime &recstartts)
00660 {
00661 MSqlQuery query(MSqlQuery::InitCon());
00662
00663 query.prepare("SELECT id FROM jobqueue "
00664 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00665 "AND type = :JOBTYPE;");
00666 query.bindValue(":CHANID", chanid);
00667 query.bindValue(":STARTTIME", recstartts);
00668 query.bindValue(":JOBTYPE", jobType);
00669
00670 if (!query.exec())
00671 {
00672 MythDB::DBError("Error in JobQueue::GetJobID()", query);
00673 return -1;
00674 }
00675 else
00676 {
00677 if (query.next())
00678 return query.value(0).toInt();
00679 }
00680
00681 return -1;
00682 }
00683
00684 bool JobQueue::GetJobInfoFromID(
00685 int jobID, int &jobType, uint &chanid, QDateTime &recstartts)
00686 {
00687 MSqlQuery query(MSqlQuery::InitCon());
00688
00689 query.prepare("SELECT type, chanid, starttime FROM jobqueue "
00690 "WHERE id = :ID;");
00691
00692 query.bindValue(":ID", jobID);
00693
00694 if (!query.exec())
00695 {
00696 MythDB::DBError("Error in JobQueue::GetJobInfoFromID()", query);
00697 return false;
00698 }
00699 else
00700 {
00701 if (query.next())
00702 {
00703 jobType = query.value(0).toInt();
00704 chanid = query.value(1).toUInt();
00705 recstartts = query.value(2).toDateTime();
00706 return true;
00707 }
00708 }
00709
00710 return false;
00711 }
00712
00713 bool JobQueue::GetJobInfoFromID(
00714 int jobID, int &jobType, uint &chanid, QString &recstartts)
00715 {
00716 QDateTime tmpStarttime;
00717
00718 bool result = JobQueue::GetJobInfoFromID(
00719 jobID, jobType, chanid, tmpStarttime);
00720
00721 if (result)
00722 recstartts = tmpStarttime.toString("yyyyMMddhhmmss");
00723
00724 return result;
00725 }
00726
00727 bool JobQueue::PauseJob(int jobID)
00728 {
00729 QString message = QString("GLOBAL_JOB PAUSE ID %1").arg(jobID);
00730 MythEvent me(message);
00731 gCoreContext->dispatch(me);
00732
00733 return ChangeJobCmds(jobID, JOB_PAUSE);
00734 }
00735
00736 bool JobQueue::ResumeJob(int jobID)
00737 {
00738 QString message = QString("GLOBAL_JOB RESUME ID %1").arg(jobID);
00739 MythEvent me(message);
00740 gCoreContext->dispatch(me);
00741
00742 return ChangeJobCmds(jobID, JOB_RESUME);
00743 }
00744
00745 bool JobQueue::RestartJob(int jobID)
00746 {
00747 QString message = QString("GLOBAL_JOB RESTART ID %1").arg(jobID);
00748 MythEvent me(message);
00749 gCoreContext->dispatch(me);
00750
00751 return ChangeJobCmds(jobID, JOB_RESTART);
00752 }
00753
00754 bool JobQueue::StopJob(int jobID)
00755 {
00756 QString message = QString("GLOBAL_JOB STOP ID %1").arg(jobID);
00757 MythEvent me(message);
00758 gCoreContext->dispatch(me);
00759
00760 return ChangeJobCmds(jobID, JOB_STOP);
00761 }
00762
00763 bool JobQueue::DeleteAllJobs(uint chanid, const QDateTime &recstartts)
00764 {
00765 MSqlQuery query(MSqlQuery::InitCon());
00766 QString message;
00767
00768 query.prepare("UPDATE jobqueue SET status = :CANCELLED "
00769 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00770 "AND status = :QUEUED;");
00771
00772 query.bindValue(":CANCELLED", JOB_CANCELLED);
00773 query.bindValue(":CHANID", chanid);
00774 query.bindValue(":STARTTIME", recstartts);
00775 query.bindValue(":QUEUED", JOB_QUEUED);
00776
00777 if (!query.exec())
00778 MythDB::DBError("Cancel Pending Jobs", query);
00779
00780 query.prepare("UPDATE jobqueue SET cmds = :CMD "
00781 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00782 "AND status <> :CANCELLED;");
00783 query.bindValue(":CMD", JOB_STOP);
00784 query.bindValue(":CHANID", chanid);
00785 query.bindValue(":STARTTIME", recstartts);
00786 query.bindValue(":CANCELLED", JOB_CANCELLED);
00787
00788 if (!query.exec())
00789 {
00790 MythDB::DBError("Stop Unfinished Jobs", query);
00791 return false;
00792 }
00793
00794
00795 bool jobsAreRunning = true;
00796 int totalSlept = 0;
00797 int maxSleep = 90;
00798 while (jobsAreRunning && totalSlept < maxSleep)
00799 {
00800 usleep(1000);
00801 query.prepare("SELECT id FROM jobqueue "
00802 "WHERE chanid = :CHANID and starttime = :STARTTIME "
00803 "AND status NOT IN "
00804 "(:FINISHED,:ABORTED,:ERRORED,:CANCELLED);");
00805 query.bindValue(":CHANID", chanid);
00806 query.bindValue(":STARTTIME", recstartts);
00807 query.bindValue(":FINISHED", JOB_FINISHED);
00808 query.bindValue(":ABORTED", JOB_ABORTED);
00809 query.bindValue(":ERRORED", JOB_ERRORED);
00810 query.bindValue(":CANCELLED", JOB_CANCELLED);
00811
00812 if (!query.exec())
00813 {
00814 MythDB::DBError("Stop Unfinished Jobs", query);
00815 return false;
00816 }
00817
00818 if (query.size() == 0)
00819 {
00820 jobsAreRunning = false;
00821 break;
00822 }
00823 else if ((totalSlept % 5) == 0)
00824 {
00825 message = QString("Waiting on %1 jobs still running for "
00826 "chanid %2 @ %3").arg(query.size())
00827 .arg(chanid).arg(recstartts.toString());
00828 LOG(VB_JOBQUEUE, LOG_INFO, LOC + message);
00829 }
00830
00831 sleep(1);
00832 totalSlept++;
00833 }
00834
00835 if (totalSlept <= maxSleep)
00836 {
00837 query.prepare("DELETE FROM jobqueue "
00838 "WHERE chanid = :CHANID AND starttime = :STARTTIME;");
00839 query.bindValue(":CHANID", chanid);
00840 query.bindValue(":STARTTIME", recstartts);
00841
00842 if (!query.exec())
00843 MythDB::DBError("Delete All Jobs", query);
00844 }
00845 else
00846 {
00847 query.prepare("SELECT id, type, status, comment FROM jobqueue "
00848 "WHERE chanid = :CHANID AND starttime = :STARTTIME "
00849 "AND status <> :CANCELLED ORDER BY id;");
00850
00851 query.bindValue(":CHANID", chanid);
00852 query.bindValue(":STARTTIME", recstartts);
00853 query.bindValue(":CANCELLED", JOB_CANCELLED);
00854
00855 if (!query.exec())
00856 {
00857 MythDB::DBError("Error in JobQueue::DeleteAllJobs(), Unable "
00858 "to query list of Jobs left in Queue.", query);
00859 return 0;
00860 }
00861
00862 LOG(VB_GENERAL, LOG_ERR, LOC +
00863 QString( "In DeleteAllJobs: There are Jobs "
00864 "left in the JobQueue that are still running for "
00865 "chanid %1 @ %2.").arg(chanid)
00866 .arg(recstartts.toString()));
00867
00868 while (query.next())
00869 {
00870 LOG(VB_GENERAL, LOG_ERR, LOC +
00871 QString("Job ID %1: '%2' with status '%3' and comment '%4'")
00872 .arg(query.value(0).toInt())
00873 .arg(JobText(query.value(1).toInt()))
00874 .arg(StatusText(query.value(2).toInt()))
00875 .arg(query.value(3).toString()));
00876 }
00877
00878 return false;
00879 }
00880
00881 return true;
00882 }
00883
00884 bool JobQueue::DeleteJob(int jobID)
00885 {
00886 if (jobID < 0)
00887 return false;
00888
00889 MSqlQuery query(MSqlQuery::InitCon());
00890
00891 query.prepare("DELETE FROM jobqueue WHERE id = :ID;");
00892
00893 query.bindValue(":ID", jobID);
00894
00895 if (!query.exec())
00896 {
00897 MythDB::DBError("Error in JobQueue::DeleteJob()", query);
00898 return false;
00899 }
00900
00901 return true;
00902 }
00903
00904 bool JobQueue::ChangeJobCmds(int jobID, int newCmds)
00905 {
00906 if (jobID < 0)
00907 return false;
00908
00909 MSqlQuery query(MSqlQuery::InitCon());
00910
00911 query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE id = :ID;");
00912
00913 query.bindValue(":CMDS", newCmds);
00914 query.bindValue(":ID", jobID);
00915
00916 if (!query.exec())
00917 {
00918 MythDB::DBError("Error in JobQueue::ChangeJobCmds()", query);
00919 return false;
00920 }
00921
00922 return true;
00923 }
00924
00925 bool JobQueue::ChangeJobCmds(int jobType, uint chanid,
00926 const QDateTime &recstartts, int newCmds)
00927 {
00928 MSqlQuery query(MSqlQuery::InitCon());
00929
00930 query.prepare("UPDATE jobqueue SET cmds = :CMDS WHERE type = :TYPE "
00931 "AND chanid = :CHANID AND starttime = :STARTTIME;");
00932
00933 query.bindValue(":CMDS", newCmds);
00934 query.bindValue(":TYPE", jobType);
00935 query.bindValue(":CHANID", chanid);
00936 query.bindValue(":STARTTIME", recstartts);
00937
00938 if (!query.exec())
00939 {
00940 MythDB::DBError("Error in JobQueue::ChangeJobCmds()", query);
00941 return false;
00942 }
00943
00944 return true;
00945 }
00946
00947 bool JobQueue::ChangeJobFlags(int jobID, int newFlags)
00948 {
00949 if (jobID < 0)
00950 return false;
00951
00952 MSqlQuery query(MSqlQuery::InitCon());
00953
00954 query.prepare("UPDATE jobqueue SET flags = :FLAGS WHERE id = :ID;");
00955
00956 query.bindValue(":FLAGS", newFlags);
00957 query.bindValue(":ID", jobID);
00958
00959 if (!query.exec())
00960 {
00961 MythDB::DBError("Error in JobQueue::ChangeJobFlags()", query);
00962 return false;
00963 }
00964
00965 return true;
00966 }
00967
00968 bool JobQueue::ChangeJobStatus(int jobID, int newStatus, QString comment)
00969 {
00970 if (jobID < 0)
00971 return false;
00972
00973 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("ChangeJobStatus(%1, %2, '%3')")
00974 .arg(jobID).arg(StatusText(newStatus)).arg(comment));
00975
00976 MSqlQuery query(MSqlQuery::InitCon());
00977
00978 query.prepare("UPDATE jobqueue SET status = :STATUS, comment = :COMMENT "
00979 "WHERE id = :ID AND status <> :NEWSTATUS;");
00980
00981 query.bindValue(":STATUS", newStatus);
00982 query.bindValue(":COMMENT", comment);
00983 query.bindValue(":ID", jobID);
00984 query.bindValue(":NEWSTATUS", newStatus);
00985
00986 if (!query.exec())
00987 {
00988 MythDB::DBError("Error in JobQueue::ChangeJobStatus()", query);
00989 return false;
00990 }
00991
00992 return true;
00993 }
00994
00995 bool JobQueue::ChangeJobComment(int jobID, QString comment)
00996 {
00997 if (jobID < 0)
00998 return false;
00999
01000 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("ChangeJobComment(%1, '%2')")
01001 .arg(jobID).arg(comment));
01002
01003 MSqlQuery query(MSqlQuery::InitCon());
01004
01005 query.prepare("UPDATE jobqueue SET comment = :COMMENT "
01006 "WHERE id = :ID;");
01007
01008 query.bindValue(":COMMENT", comment);
01009 query.bindValue(":ID", jobID);
01010
01011 if (!query.exec())
01012 {
01013 MythDB::DBError("Error in JobQueue::ChangeJobComment()", query);
01014 return false;
01015 }
01016
01017 return true;
01018 }
01019
01020 bool JobQueue::ChangeJobArgs(int jobID, QString args)
01021 {
01022 if (jobID < 0)
01023 return false;
01024
01025 MSqlQuery query(MSqlQuery::InitCon());
01026
01027 query.prepare("UPDATE jobqueue SET args = :ARGS "
01028 "WHERE id = :ID;");
01029
01030 query.bindValue(":ARGS", args);
01031 query.bindValue(":ID", jobID);
01032
01033 if (!query.exec())
01034 {
01035 MythDB::DBError("Error in JobQueue::ChangeJobArgs()", query);
01036 return false;
01037 }
01038
01039 return true;
01040 }
01041
01042 int JobQueue::GetRunningJobID(uint chanid, const QDateTime &recstartts)
01043 {
01044 runningJobsLock->lock();
01045 QMap<int, RunningJobInfo>::iterator it = runningJobs.begin();
01046 for (; it != runningJobs.end(); ++it)
01047 {
01048 RunningJobInfo jInfo = *it;
01049
01050 if ((jInfo.pginfo->GetChanID() == chanid) &&
01051 (jInfo.pginfo->GetRecordingStartTime() == recstartts))
01052 {
01053 runningJobsLock->unlock();
01054
01055 return jInfo.id;
01056 }
01057 }
01058 runningJobsLock->unlock();
01059
01060 return 0;
01061 }
01062
01063 bool JobQueue::IsJobRunning(int jobType,
01064 uint chanid, const QDateTime &recstartts)
01065 {
01066 int tmpStatus = GetJobStatus(jobType, chanid, recstartts);
01067
01068 if ((tmpStatus != JOB_UNKNOWN) && (tmpStatus != JOB_QUEUED) &&
01069 (!(tmpStatus & JOB_DONE)))
01070 return true;
01071
01072 return false;
01073 }
01074
01075 bool JobQueue::IsJobRunning(int jobType, const ProgramInfo &pginfo)
01076 {
01077 return JobQueue::IsJobRunning(
01078 jobType, pginfo.GetChanID(), pginfo.GetRecordingStartTime());
01079 }
01080
01081 bool JobQueue::IsJobQueuedOrRunning(
01082 int jobType, uint chanid, const QDateTime &recstartts)
01083 {
01084 int tmpStatus = GetJobStatus(jobType, chanid, recstartts);
01085
01086 if ((tmpStatus != JOB_UNKNOWN) && (!(tmpStatus & JOB_DONE)))
01087 return true;
01088
01089 return false;
01090 }
01091
01092 bool JobQueue::IsJobQueued(
01093 int jobType, uint chanid, const QDateTime &recstartts)
01094 {
01095 int tmpStatus = GetJobStatus(jobType, chanid, recstartts);
01096
01097 if (tmpStatus & JOB_QUEUED)
01098 return true;
01099
01100 return false;
01101 }
01102
01103 QString JobQueue::JobText(int jobType)
01104 {
01105 switch (jobType)
01106 {
01107 case JOB_TRANSCODE: return tr("Transcode");
01108 case JOB_COMMFLAG: return tr("Flag Commercials");
01109 case JOB_METADATA: return tr("Look up Metadata");
01110 }
01111
01112 if (jobType & JOB_USERJOB)
01113 {
01114 QString settingName =
01115 QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType));
01116 return gCoreContext->GetSetting(settingName, settingName);
01117 }
01118
01119 return tr("Unknown Job");
01120 }
01121
01122 QString JobQueue::StatusText(int status)
01123 {
01124 switch (status)
01125 {
01126 #define JOBSTATUS_STATUSTEXT(A,B,C) case A: return C;
01127 JOBSTATUS_MAP(JOBSTATUS_STATUSTEXT)
01128 default: break;
01129 }
01130 return tr("Undefined");
01131 }
01132
01133 bool JobQueue::InJobRunWindow(int orStartsWithinMins)
01134 {
01135 QString queueStartTimeStr;
01136 QString queueEndTimeStr;
01137 QTime queueStartTime;
01138 QTime queueEndTime;
01139 QTime curTime = QTime::currentTime();
01140 bool inTimeWindow = false;
01141 orStartsWithinMins = orStartsWithinMins < 0 ? 0 : orStartsWithinMins;
01142
01143 queueStartTimeStr = gCoreContext->GetSetting("JobQueueWindowStart", "00:00");
01144 queueEndTimeStr = gCoreContext->GetSetting("JobQueueWindowEnd", "23:59");
01145
01146 queueStartTime = QTime::fromString(queueStartTimeStr, "hh:mm");
01147 if (!queueStartTime.isValid())
01148 {
01149 LOG(VB_GENERAL, LOG_ERR,
01150 QString("Invalid JobQueueWindowStart time '%1', using 00:00")
01151 .arg(queueStartTimeStr));
01152 queueStartTime = QTime(0, 0);
01153 }
01154
01155 queueEndTime = QTime::fromString(queueEndTimeStr, "hh:mm");
01156 if (!queueEndTime.isValid())
01157 {
01158 LOG(VB_GENERAL, LOG_ERR,
01159 QString("Invalid JobQueueWindowEnd time '%1', using 23:59")
01160 .arg(queueEndTimeStr));
01161 queueEndTime = QTime(23, 59);
01162 }
01163
01164 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01165 QString("Currently set to run new jobs from %1 to %2")
01166 .arg(queueStartTimeStr).arg(queueEndTimeStr));
01167
01168 if ((queueStartTime <= curTime) && (curTime < queueEndTime))
01169 {
01170 inTimeWindow = true;
01171 }
01172 else if ((queueStartTime > queueEndTime) &&
01173 ((curTime < queueEndTime) || (queueStartTime <= curTime)))
01174 {
01175 inTimeWindow = true;
01176 }
01177 else if (orStartsWithinMins > 0)
01178 {
01179
01180 if (curTime <= queueStartTime)
01181 {
01182
01183 if (queueStartTime.secsTo(curTime) <= (orStartsWithinMins * 60))
01184 {
01185 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01186 QString("Job run window will start within %1 minutes")
01187 .arg(orStartsWithinMins));
01188 inTimeWindow = true;
01189 }
01190 }
01191 else
01192 {
01193
01194 QDateTime curDateTime = QDateTime::currentDateTime();
01195 QDateTime startDateTime = QDateTime(QDate::currentDate(),
01196 queueStartTime).addDays(1);
01197
01198 if (curDateTime.secsTo(startDateTime) <= (orStartsWithinMins * 60))
01199 {
01200 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01201 QString("Job run window will start "
01202 "within %1 minutes (tomorrow)")
01203 .arg(orStartsWithinMins));
01204 inTimeWindow = true;
01205 }
01206 }
01207 }
01208
01209 return inTimeWindow;
01210 }
01211
01212 bool JobQueue::HasRunningOrPendingJobs(int startingWithinMins)
01213 {
01214
01215
01216 QMap<int, JobQueueEntry> jobs;
01217 QMap<int, JobQueueEntry>::Iterator it;
01218 QDateTime maxSchedRunTime = QDateTime::currentDateTime();
01219 int tmpStatus = 0;
01220 bool checkForQueuedJobs = (startingWithinMins <= 0
01221 || InJobRunWindow(startingWithinMins));
01222
01223 if (checkForQueuedJobs && startingWithinMins > 0) {
01224 maxSchedRunTime = maxSchedRunTime.addSecs(startingWithinMins * 60);
01225 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01226 QString("HasRunningOrPendingJobs: checking for jobs "
01227 "starting before: %1").arg(maxSchedRunTime.toString()));
01228 }
01229
01230 JobQueue::GetJobsInQueue(jobs, JOB_LIST_NOT_DONE);
01231
01232 if (jobs.size()) {
01233 for (it = jobs.begin(); it != jobs.end(); ++it)
01234 {
01235 tmpStatus = (*it).status;
01236 if (tmpStatus == JOB_RUNNING) {
01237 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01238 QString("HasRunningOrPendingJobs: found running job"));
01239 return true;
01240 }
01241
01242 if (checkForQueuedJobs) {
01243 if ((tmpStatus != JOB_UNKNOWN) && (!(tmpStatus & JOB_DONE))) {
01244 if (startingWithinMins <= 0) {
01245 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01246 "HasRunningOrPendingJobs: found pending job");
01247 return true;
01248 }
01249 else if ((*it).schedruntime <= maxSchedRunTime) {
01250 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01251 QString("HasRunningOrPendingJobs: found pending "
01252 "job scheduled to start at: %1")
01253 .arg((*it).schedruntime.toString()));
01254 return true;
01255 }
01256 }
01257 }
01258 }
01259 }
01260 return false;
01261 }
01262
01263
01264 int JobQueue::GetJobsInQueue(QMap<int, JobQueueEntry> &jobs, int findJobs)
01265 {
01266 JobQueueEntry thisJob;
01267 MSqlQuery query(MSqlQuery::InitCon());
01268 QDateTime recentDate = QDateTime::currentDateTime().addSecs(-4 * 3600);
01269 QString logInfo;
01270 int jobCount = 0;
01271 bool commflagWhileRecording =
01272 gCoreContext->GetNumSetting("AutoCommflagWhileRecording", 0);
01273
01274 jobs.clear();
01275
01276 query.prepare("SELECT j.id, j.chanid, j.starttime, j.inserttime, j.type, "
01277 "j.cmds, j.flags, j.status, j.statustime, j.hostname, "
01278 "j.args, j.comment, r.endtime, j.schedruntime "
01279 "FROM jobqueue j "
01280 "LEFT JOIN recorded r "
01281 " ON j.chanid = r.chanid AND j.starttime = r.starttime "
01282 "ORDER BY j.schedruntime, j.id;");
01283
01284 if (!query.exec())
01285 {
01286 MythDB::DBError("Error in JobQueue::GetJobs(), Unable to "
01287 "query list of Jobs in Queue.", query);
01288 return 0;
01289 }
01290
01291 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01292 QString("GetJobsInQueue: findJobs search bitmask %1, "
01293 "found %2 total jobs")
01294 .arg(findJobs).arg(query.size()));
01295
01296 while (query.next())
01297 {
01298 bool wantThisJob = false;
01299
01300 thisJob.id = query.value(0).toInt();
01301 thisJob.recstartts = query.value(2).toDateTime();
01302 thisJob.schedruntime = query.value(13).toDateTime();
01303 thisJob.type = query.value(4).toInt();
01304 thisJob.status = query.value(7).toInt();
01305 thisJob.statustime = query.value(8).toDateTime();
01306 thisJob.startts = thisJob.recstartts.toString("yyyyMMddhhmmss");
01307
01308
01309 if (query.value(1).toInt() == -1)
01310 {
01311 thisJob.chanid = 0;
01312 logInfo = QString("jobID #%1").arg(thisJob.id);
01313 }
01314 else
01315 {
01316 thisJob.chanid = query.value(1).toUInt();
01317 logInfo = QString("chanid %1 @ %2").arg(thisJob.chanid)
01318 .arg(thisJob.startts);
01319 }
01320
01321 if ((query.value(12).toDateTime() > QDateTime::currentDateTime()) &&
01322 ((!commflagWhileRecording) ||
01323 ((thisJob.type != JOB_COMMFLAG) &&
01324 (thisJob.type != JOB_METADATA))))
01325 {
01326 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01327 QString("GetJobsInQueue: Ignoring '%1' Job "
01328 "for %2 in %3 state. Endtime in future.")
01329 .arg(JobText(thisJob.type))
01330 .arg(logInfo).arg(StatusText(thisJob.status)));
01331 continue;
01332 }
01333
01334 if ((findJobs & JOB_LIST_ALL) ||
01335 ((findJobs & JOB_LIST_DONE) &&
01336 (thisJob.status & JOB_DONE)) ||
01337 ((findJobs & JOB_LIST_NOT_DONE) &&
01338 (!(thisJob.status & JOB_DONE))) ||
01339 ((findJobs & JOB_LIST_ERROR) &&
01340 (thisJob.status == JOB_ERRORED)) ||
01341 ((findJobs & JOB_LIST_RECENT) &&
01342 (thisJob.statustime > recentDate)))
01343 wantThisJob = true;
01344
01345 if (!wantThisJob)
01346 {
01347 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01348 QString("GetJobsInQueue: Ignore '%1' Job for %2 in %3 state.")
01349 .arg(JobText(thisJob.type))
01350 .arg(logInfo).arg(StatusText(thisJob.status)));
01351 continue;
01352 }
01353
01354 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01355 QString("GetJobsInQueue: Found '%1' Job for %2 in %3 state.")
01356 .arg(JobText(thisJob.type))
01357 .arg(logInfo).arg(StatusText(thisJob.status)));
01358
01359 thisJob.inserttime = query.value(3).toDateTime();
01360 thisJob.cmds = query.value(5).toInt();
01361 thisJob.flags = query.value(6).toInt();
01362 thisJob.hostname = query.value(9).toString();
01363 thisJob.args = query.value(10).toString();
01364 thisJob.comment = query.value(11).toString();
01365
01366 if ((thisJob.type & JOB_USERJOB) &&
01367 (UserJobTypeToIndex(thisJob.type) == 0))
01368 {
01369 thisJob.type = JOB_NONE;
01370 LOG(VB_JOBQUEUE, LOG_INFO, LOC +
01371 QString("GetJobsInQueue: Unknown Job Type: %1")
01372 .arg(thisJob.type));
01373 }
01374
01375 if (thisJob.type != JOB_NONE)
01376 jobs[jobCount++] = thisJob;
01377 }
01378
01379 return jobCount;
01380 }
01381
01382 bool JobQueue::ChangeJobHost(int jobID, QString newHostname)
01383 {
01384 MSqlQuery query(MSqlQuery::InitCon());
01385
01386 if (!newHostname.isEmpty())
01387 {
01388 query.prepare("UPDATE jobqueue SET hostname = :NEWHOSTNAME "
01389 "WHERE hostname = :EMPTY AND id = :ID;");
01390 query.bindValue(":NEWHOSTNAME", newHostname);
01391 query.bindValue(":EMPTY", "");
01392 query.bindValue(":ID", jobID);
01393 }
01394 else
01395 {
01396 query.prepare("UPDATE jobqueue SET hostname = :EMPTY "
01397 "WHERE id = :ID;");
01398 query.bindValue(":EMPTY", "");
01399 query.bindValue(":ID", jobID);
01400 }
01401
01402 if (!query.exec())
01403 {
01404 MythDB::DBError(QString("Error in JobQueue::ChangeJobHost(), "
01405 "Unable to set hostname to '%1' for "
01406 "job %2.").arg(newHostname).arg(jobID),
01407 query);
01408 return false;
01409 }
01410
01411 if (query.numRowsAffected() > 0)
01412 return true;
01413
01414 return false;
01415 }
01416
01417 bool JobQueue::AllowedToRun(JobQueueEntry job)
01418 {
01419 QString allowSetting;
01420
01421 if ((!job.hostname.isEmpty()) &&
01422 (job.hostname != m_hostname))
01423 return false;
01424
01425 if (job.type & JOB_USERJOB)
01426 {
01427 allowSetting =
01428 QString("JobAllowUserJob%1").arg(UserJobTypeToIndex(job.type));
01429 }
01430 else
01431 {
01432 switch (job.type)
01433 {
01434 case JOB_TRANSCODE: allowSetting = "JobAllowTranscode";
01435 break;
01436 case JOB_COMMFLAG: allowSetting = "JobAllowCommFlag";
01437 break;
01438 case JOB_METADATA: allowSetting = "JobAllowMetadata";
01439 break;
01440 default: return false;
01441 }
01442 }
01443
01444 if (gCoreContext->GetNumSetting(allowSetting, 1))
01445 return true;
01446
01447 return false;
01448 }
01449
01450 enum JobCmds JobQueue::GetJobCmd(int jobID)
01451 {
01452 MSqlQuery query(MSqlQuery::InitCon());
01453
01454 query.prepare("SELECT cmds FROM jobqueue WHERE id = :ID;");
01455
01456 query.bindValue(":ID", jobID);
01457
01458 if (query.exec())
01459 {
01460 if (query.next())
01461 return (enum JobCmds)query.value(0).toInt();
01462 }
01463 else
01464 {
01465 MythDB::DBError("Error in JobQueue::GetJobCmd()", query);
01466 }
01467
01468 return JOB_RUN;
01469 }
01470
01471 QString JobQueue::GetJobArgs(int jobID)
01472 {
01473 MSqlQuery query(MSqlQuery::InitCon());
01474
01475 query.prepare("SELECT args FROM jobqueue WHERE id = :ID;");
01476
01477 query.bindValue(":ID", jobID);
01478
01479 if (query.exec())
01480 {
01481 if (query.next())
01482 return query.value(0).toString();
01483 }
01484 else
01485 {
01486 MythDB::DBError("Error in JobQueue::GetJobArgs()", query);
01487 }
01488
01489 return QString("");
01490 }
01491
01492 enum JobFlags JobQueue::GetJobFlags(int jobID)
01493 {
01494 MSqlQuery query(MSqlQuery::InitCon());
01495
01496 query.prepare("SELECT flags FROM jobqueue WHERE id = :ID;");
01497
01498 query.bindValue(":ID", jobID);
01499
01500 if (query.exec())
01501 {
01502 if (query.next())
01503 return (enum JobFlags)query.value(0).toInt();
01504 }
01505 else
01506 {
01507 MythDB::DBError("Error in JobQueue::GetJobFlags()", query);
01508 }
01509
01510 return JOB_NO_FLAGS;
01511 }
01512
01513 enum JobStatus JobQueue::GetJobStatus(int jobID)
01514 {
01515 MSqlQuery query(MSqlQuery::InitCon());
01516
01517 query.prepare("SELECT status FROM jobqueue WHERE id = :ID;");
01518
01519 query.bindValue(":ID", jobID);
01520
01521 if (query.exec())
01522 {
01523 if (query.next())
01524 return (enum JobStatus)query.value(0).toInt();
01525 }
01526 else
01527 {
01528 MythDB::DBError("Error in JobQueue::GetJobStatus()", query);
01529 }
01530 return JOB_UNKNOWN;
01531 }
01532
01533 enum JobStatus JobQueue::GetJobStatus(
01534 int jobType, uint chanid, const QDateTime &recstartts)
01535 {
01536 MSqlQuery query(MSqlQuery::InitCon());
01537
01538 query.prepare("SELECT status FROM jobqueue WHERE type = :TYPE "
01539 "AND chanid = :CHANID AND starttime = :STARTTIME;");
01540
01541 query.bindValue(":TYPE", jobType);
01542 query.bindValue(":CHANID", chanid);
01543 query.bindValue(":STARTTIME", recstartts);
01544
01545 if (query.exec())
01546 {
01547 if (query.next())
01548 return (enum JobStatus)query.value(0).toInt();
01549 }
01550 else
01551 {
01552 MythDB::DBError("Error in JobQueue::GetJobStatus()", query);
01553 }
01554 return JOB_UNKNOWN;
01555 }
01556
01557 void JobQueue::RecoverQueue(bool justOld)
01558 {
01559 QMap<int, JobQueueEntry> jobs;
01560 QString msg;
01561 QString logInfo;
01562
01563 msg = QString("RecoverQueue: Checking for unfinished jobs to "
01564 "recover.");
01565 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
01566
01567 GetJobsInQueue(jobs);
01568
01569 if (jobs.size())
01570 {
01571 QMap<int, JobQueueEntry>::Iterator it;
01572 QDateTime oldDate = QDateTime::currentDateTime().addDays(-1);
01573 QString hostname = gCoreContext->GetHostName();
01574 int tmpStatus;
01575 int tmpCmds;
01576
01577 for (it = jobs.begin(); it != jobs.end(); ++it)
01578 {
01579 tmpCmds = (*it).cmds;
01580 tmpStatus = (*it).status;
01581
01582 if (!(*it).chanid)
01583 logInfo = QString("jobID #%1").arg((*it).id);
01584 else
01585 logInfo = QString("chanid %1 @ %2").arg((*it).chanid)
01586 .arg((*it).startts);
01587
01588 if (((tmpStatus == JOB_STARTING) ||
01589 (tmpStatus == JOB_RUNNING) ||
01590 (tmpStatus == JOB_PAUSED) ||
01591 (tmpCmds & JOB_STOP) ||
01592 (tmpStatus == JOB_STOPPING)) &&
01593 (((!justOld) &&
01594 ((*it).hostname == hostname)) ||
01595 ((*it).statustime < oldDate)))
01596 {
01597 msg = QString("RecoverQueue: Recovering '%1' for %2 "
01598 "from '%3' state.")
01599 .arg(JobText((*it).type))
01600 .arg(logInfo).arg(StatusText((*it).status));
01601 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
01602
01603 ChangeJobStatus((*it).id, JOB_QUEUED, "");
01604 ChangeJobCmds((*it).id, JOB_RUN);
01605 if (!gCoreContext->GetNumSetting("JobsRunOnRecordHost", 0))
01606 ChangeJobHost((*it).id, "");
01607 }
01608 else
01609 {
01610 #if 0
01611 msg = QString("RecoverQueue: Ignoring '%1' for %2 "
01612 "in '%3' state.")
01613 .arg(JobText((*it).type))
01614 .arg(logInfo).arg(StatusText((*it).status));
01615 LOG(VB_JOBQUEUE, LOG_INFO, LOC + msg);
01616 #endif
01617 }
01618 }
01619 }
01620 }
01621
01622 void JobQueue::CleanupOldJobsInQueue()
01623 {
01624 MSqlQuery delquery(MSqlQuery::InitCon());
01625 QDateTime donePurgeDate = QDateTime::currentDateTime().addDays(-2);
01626 QDateTime errorsPurgeDate = QDateTime::currentDateTime().addDays(-4);
01627
01628 delquery.prepare("DELETE FROM jobqueue "
01629 "WHERE (status in (:FINISHED, :ABORTED, :CANCELLED) "
01630 "AND statustime < :DONEPURGEDATE) "
01631 "OR (status in (:ERRORED) "
01632 "AND statustime < :ERRORSPURGEDATE) ");
01633 delquery.bindValue(":FINISHED", JOB_FINISHED);
01634 delquery.bindValue(":ABORTED", JOB_ABORTED);
01635 delquery.bindValue(":CANCELLED", JOB_CANCELLED);
01636 delquery.bindValue(":ERRORED", JOB_ERRORED);
01637 delquery.bindValue(":DONEPURGEDATE", donePurgeDate);
01638 delquery.bindValue(":ERRORSPURGEDATE", errorsPurgeDate);
01639
01640 if (!delquery.exec())
01641 {
01642 MythDB::DBError("JobQueue::CleanupOldJobsInQueue: Error deleting "
01643 "old finished jobs.", delquery);
01644 }
01645 }
01646
01647 void JobQueue::ProcessJob(JobQueueEntry job)
01648 {
01649 int jobID = job.id;
01650 QString name = QString("jobqueue%1%2").arg(jobID).arg(random());
01651
01652 if (!MSqlQuery::testDBConnection())
01653 {
01654 LOG(VB_JOBQUEUE, LOG_ERR, LOC +
01655 "ProcessJob(): Unable to open database connection");
01656 return;
01657 }
01658
01659 ChangeJobStatus(jobID, JOB_PENDING);
01660 ProgramInfo *pginfo = NULL;
01661
01662 if (job.chanid)
01663 {
01664 pginfo = new ProgramInfo(job.chanid, job.recstartts);
01665
01666 if (!pginfo->GetChanID())
01667 {
01668 LOG(VB_JOBQUEUE, LOG_ERR, LOC +
01669 QString("Unable to retrieve program info for chanid %1 @ %2")
01670 .arg(job.chanid).arg(job.recstartts.toString()));
01671
01672 ChangeJobStatus(jobID, JOB_ERRORED,
01673 "Unable to retrieve Program Info from database");
01674
01675 delete pginfo;
01676
01677 return;
01678 }
01679
01680 pginfo->SetPathname(pginfo->GetPlaybackURL());
01681 }
01682
01683
01684 runningJobsLock->lock();
01685
01686 ChangeJobStatus(jobID, JOB_STARTING);
01687 RunningJobInfo jInfo;
01688 jInfo.type = job.type;
01689 jInfo.id = jobID;
01690 jInfo.flag = JOB_RUN;
01691 jInfo.desc = GetJobDescription(job.type);
01692 jInfo.command = GetJobCommand(jobID, job.type, pginfo);
01693 jInfo.pginfo = pginfo;
01694
01695 runningJobs[jobID] = jInfo;
01696
01697 if (pginfo)
01698 pginfo->MarkAsInUse(true, kJobQueueInUseID);
01699
01700 if (pginfo && pginfo->GetRecordingGroup() == "Deleted")
01701 {
01702 ChangeJobStatus(jobID, JOB_CANCELLED,
01703 "Program has been deleted");
01704 RemoveRunningJob(jobID);
01705 }
01706 else if ((job.type == JOB_TRANSCODE) ||
01707 (runningJobs[jobID].command == "mythtranscode"))
01708 {
01709 StartChildJob(TranscodeThread, jobID);
01710 }
01711 else if ((job.type == JOB_COMMFLAG) ||
01712 (runningJobs[jobID].command == "mythcommflag"))
01713 {
01714 StartChildJob(FlagCommercialsThread, jobID);
01715 }
01716 else if ((job.type == JOB_METADATA) ||
01717 (runningJobs[jobID].command == "mythmetadatalookup"))
01718 {
01719 StartChildJob(MetadataLookupThread, jobID);
01720 }
01721 else if (job.type & JOB_USERJOB)
01722 {
01723 StartChildJob(UserJobThread, jobID);
01724 }
01725 else
01726 {
01727 ChangeJobStatus(jobID, JOB_ERRORED,
01728 "UNKNOWN JobType, unable to process!");
01729 RemoveRunningJob(jobID);
01730 }
01731
01732 runningJobsLock->unlock();
01733 }
01734
01735 void JobQueue::StartChildJob(void *(*ChildThreadRoutine)(void *), int jobID)
01736 {
01737 JobThreadStruct *jts = new JobThreadStruct;
01738 jts->jq = this;
01739 jts->jobID = jobID;
01740
01741 pthread_t childThread;
01742 pthread_attr_t attr;
01743 pthread_attr_init(&attr);
01744 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
01745 pthread_create(&childThread, &attr, ChildThreadRoutine, jts);
01746 pthread_attr_destroy(&attr);
01747 }
01748
01749 QString JobQueue::GetJobDescription(int jobType)
01750 {
01751 if (jobType == JOB_TRANSCODE)
01752 return "Transcode";
01753 else if (jobType == JOB_COMMFLAG)
01754 return "Commercial Detection";
01755 else if (!(jobType & JOB_USERJOB))
01756 return "Unknown Job";
01757
01758 QString descSetting =
01759 QString("UserJobDesc%1").arg(UserJobTypeToIndex(jobType));
01760
01761 return gCoreContext->GetSetting(descSetting, "Unknown Job");
01762 }
01763
01764 QString JobQueue::GetJobCommand(int id, int jobType, ProgramInfo *tmpInfo)
01765 {
01766 QString command;
01767 MSqlQuery query(MSqlQuery::InitCon());
01768
01769 if (jobType == JOB_TRANSCODE)
01770 {
01771 command = gCoreContext->GetSetting("JobQueueTranscodeCommand");
01772 if (command.trimmed().isEmpty())
01773 command = "mythtranscode";
01774
01775 if (command == "mythtranscode")
01776 return command;
01777 }
01778 else if (jobType == JOB_COMMFLAG)
01779 {
01780 command = gCoreContext->GetSetting("JobQueueCommFlagCommand");
01781 if (command.trimmed().isEmpty())
01782 command = "mythcommflag";
01783
01784 if (command == "mythcommflag")
01785 return command;
01786 }
01787 else if (jobType & JOB_USERJOB)
01788 {
01789 command = gCoreContext->GetSetting(
01790 QString("UserJob%1").arg(UserJobTypeToIndex(jobType)), "");
01791 }
01792
01793 if (!command.isEmpty())
01794 {
01795 command.replace("%JOBID%", QString("%1").arg(id));
01796 }
01797
01798 if (!command.isEmpty() && tmpInfo)
01799 {
01800 tmpInfo->SubstituteMatches(command);
01801
01802 command.replace("%VERBOSELEVEL%", QString("%1").arg(verboseMask));
01803 command.replace("%VERBOSEMODE%", QString("%1").arg(logPropagateArgs));
01804
01805 uint transcoder = tmpInfo->QueryTranscoderID();
01806 command.replace("%TRANSPROFILE%",
01807 (RecordingProfile::TranscoderAutodetect == transcoder) ?
01808 "autodetect" : QString::number(transcoder));
01809 }
01810
01811 return command;
01812 }
01813
01814 void JobQueue::RemoveRunningJob(int id)
01815 {
01816 runningJobsLock->lock();
01817
01818 if (runningJobs.contains(id))
01819 {
01820 ProgramInfo *pginfo = runningJobs[id].pginfo;
01821 if (pginfo)
01822 {
01823 pginfo->MarkAsInUse(false, kJobQueueInUseID);
01824 delete pginfo;
01825 }
01826
01827 runningJobs.remove(id);
01828 }
01829
01830 runningJobsLock->unlock();
01831 }
01832
01833 QString JobQueue::PrettyPrint(off_t bytes)
01834 {
01835
01836
01837 static const struct {
01838 const char *suffix;
01839 unsigned int max;
01840 int precision;
01841 } pptab[] = {
01842 { "bytes", 9999, 0 },
01843 { "kB", 999, 0 },
01844 { "MB", 999, 1 },
01845 { "GB", 999, 1 },
01846 { "TB", 999, 1 },
01847 { "PB", 999, 1 },
01848 { "EB", 999, 1 },
01849 { "ZB", 999, 1 },
01850 { "YB", 0, 0 },
01851 };
01852 unsigned int ii;
01853 float fbytes = bytes;
01854
01855 ii = 0;
01856 while (pptab[ii].max && fbytes > pptab[ii].max) {
01857 fbytes /= 1024;
01858 ii++;
01859 }
01860
01861 return QString("%1 %2")
01862 .arg(fbytes, 0, 'f', pptab[ii].precision)
01863 .arg(pptab[ii].suffix);
01864 }
01865
01866 void *JobQueue::TranscodeThread(void *param)
01867 {
01868 JobThreadStruct *jts = (JobThreadStruct *)param;
01869 JobQueue *jq = jts->jq;
01870
01871 MThread::ThreadSetup(QString("Transcode_%1").arg(jts->jobID));
01872 jq->DoTranscodeThread(jts->jobID);
01873 MThread::ThreadCleanup();
01874
01875 delete jts;
01876
01877 return NULL;
01878 }
01879
01880 void JobQueue::DoTranscodeThread(int jobID)
01881 {
01882
01883 runningJobsLock->lock();
01884 if (!runningJobs[jobID].pginfo)
01885 {
01886 LOG(VB_JOBQUEUE, LOG_ERR, LOC +
01887 "The JobQueue cannot currently transcode files that do not "
01888 "have a chanid/starttime in the recorded table.");
01889 ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
01890 RemoveRunningJob(jobID);
01891 runningJobsLock->unlock();
01892 return;
01893 }
01894
01895 ProgramInfo *program_info = runningJobs[jobID].pginfo;
01896 runningJobsLock->unlock();
01897
01898 ChangeJobStatus(jobID, JOB_RUNNING);
01899
01900
01901 program_info->Reload();
01902
01903 bool useCutlist = program_info->HasCutlist() &&
01904 !!(GetJobFlags(jobID) & JOB_USE_CUTLIST);
01905
01906 uint transcoder = program_info->QueryTranscoderID();
01907 QString profilearg =
01908 (RecordingProfile::TranscoderAutodetect == transcoder) ?
01909 "autodetect" : QString::number(transcoder);
01910
01911 QString path;
01912 QString command;
01913
01914 runningJobsLock->lock();
01915 if (runningJobs[jobID].command == "mythtranscode")
01916 {
01917 path = GetInstallPrefix() + "/bin/mythtranscode";
01918 command = QString("%1 -j %2 --profile %3")
01919 .arg(path).arg(jobID).arg(profilearg);
01920 if (useCutlist)
01921 command += " --honorcutlist";
01922 command += logPropagateArgs;
01923 }
01924 else
01925 {
01926 command = runningJobs[jobID].command;
01927
01928 QStringList tokens = command.split(" ", QString::SkipEmptyParts);
01929 if (!tokens.empty())
01930 path = tokens[0];
01931 }
01932 runningJobsLock->unlock();
01933
01934 if (jobQueueCPU < 2)
01935 {
01936 myth_nice(17);
01937 myth_ioprio((0 == jobQueueCPU) ? 8 : 7);
01938 }
01939
01940 QString transcoderName;
01941 if (transcoder == RecordingProfile::TranscoderAutodetect)
01942 {
01943 transcoderName = "Autodetect";
01944 }
01945 else
01946 {
01947 MSqlQuery query(MSqlQuery::InitCon());
01948 query.prepare("SELECT name FROM recordingprofiles WHERE id = :ID;");
01949 query.bindValue(":ID", transcoder);
01950 if (query.exec() && query.next())
01951 {
01952 transcoderName = query.value(0).toString();
01953 }
01954 else
01955 {
01956
01957 transcoderName = QString("Autodetect(%1)").arg(transcoder);
01958 }
01959 }
01960
01961 QString msg;
01962 bool retry = true;
01963 int retrylimit = 3;
01964 while (retry)
01965 {
01966 retry = false;
01967
01968 ChangeJobStatus(jobID, JOB_STARTING);
01969 program_info->SaveTranscodeStatus(TRANSCODING_RUNNING);
01970
01971 QString filename = program_info->GetPlaybackURL(false, true);
01972
01973 long long filesize = 0;
01974 long long origfilesize = QFileInfo(filename).size();
01975
01976 QString msg = QString("Transcode %1")
01977 .arg(StatusText(GetJobStatus(jobID)));
01978
01979 QString detailstr = QString("%1: %2 (%3)")
01980 .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
01981 .arg(transcoderName)
01982 .arg(PrettyPrint(origfilesize));
01983 QByteArray details = detailstr.toLocal8Bit();
01984
01985 LOG(VB_GENERAL, LOG_INFO, LOC + QString("%1 for %2")
01986 .arg(msg).arg(details.constData()));
01987
01988 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
01989 .arg(command));
01990
01991 GetMythDB()->GetDBManager()->CloseDatabases();
01992 uint result = myth_system(command);
01993 int status = GetJobStatus(jobID);
01994
01995 if ((result == GENERIC_EXIT_DAEMONIZING_ERROR) ||
01996 (result == GENERIC_EXIT_CMD_NOT_FOUND))
01997 {
01998 ChangeJobStatus(jobID, JOB_ERRORED,
01999 "ERROR: Unable to find mythtranscode, check backend logs.");
02000 program_info->SaveTranscodeStatus(TRANSCODING_NOT_TRANSCODED);
02001
02002 msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID)));
02003 detailstr = QString("%1: %2 does not exist or is not executable")
02004 .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
02005 .arg(path);
02006 details = detailstr.toLocal8Bit();
02007
02008 LOG(VB_GENERAL, LOG_ERR, LOC +
02009 QString("%1 for %2").arg(msg).arg(details.constData()));
02010 }
02011 else if (result == GENERIC_EXIT_RESTART && retrylimit > 0)
02012 {
02013 LOG(VB_JOBQUEUE, LOG_INFO, LOC + "Transcode command restarting");
02014 retry = true;
02015 retrylimit--;
02016
02017 program_info->SaveTranscodeStatus(TRANSCODING_NOT_TRANSCODED);
02018 }
02019 else
02020 {
02021 if (status == JOB_FINISHED)
02022 {
02023 ChangeJobStatus(jobID, JOB_FINISHED, "Finished.");
02024 retry = false;
02025
02026 filename = program_info->GetPlaybackURL(false, true);
02027 QFileInfo st(filename);
02028
02029 if (st.exists())
02030 {
02031 filesize = st.size();
02032
02033 QString comment = QString("%1: %2 => %3")
02034 .arg(transcoderName)
02035 .arg(PrettyPrint(origfilesize))
02036 .arg(PrettyPrint(filesize));
02037 ChangeJobComment(jobID, comment);
02038
02039 if (filesize > 0)
02040 program_info->SaveFilesize(filesize);
02041
02042 details = (QString("%1: %2 (%3)")
02043 .arg(program_info->toString(
02044 ProgramInfo::kTitleSubtitle))
02045 .arg(transcoderName)
02046 .arg(PrettyPrint(filesize))).toLocal8Bit();
02047 }
02048 else
02049 {
02050 QString comment =
02051 QString("could not stat '%1'").arg(filename);
02052
02053 ChangeJobStatus(jobID, JOB_FINISHED, comment);
02054
02055 details = (QString("%1: %2")
02056 .arg(program_info->toString(
02057 ProgramInfo::kTitleSubtitle))
02058 .arg(comment)).toLocal8Bit();
02059 }
02060
02061 program_info->SaveTranscodeStatus(TRANSCODING_COMPLETE);
02062 }
02063 else
02064 {
02065 program_info->SaveTranscodeStatus(TRANSCODING_NOT_TRANSCODED);
02066
02067 QString comment =
02068 QString("exit status %1, job status was \"%2\"")
02069 .arg(result)
02070 .arg(StatusText(status));
02071
02072 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02073
02074 details = (QString("%1: %2 (%3)")
02075 .arg(program_info->toString(
02076 ProgramInfo::kTitleSubtitle))
02077 .arg(transcoderName)
02078 .arg(comment)).toLocal8Bit().constData();
02079 }
02080
02081 msg = QString("Transcode %1").arg(StatusText(GetJobStatus(jobID)));
02082 LOG(VB_GENERAL, LOG_INFO, LOC + msg + ": " + details);
02083 }
02084 }
02085
02086 if (retrylimit == 0)
02087 {
02088 LOG(VB_JOBQUEUE, LOG_ERR, LOC + "Retry limit exceeded for transcoder, "
02089 "setting job status to errored.");
02090 ChangeJobStatus(jobID, JOB_ERRORED, "Retry limit exceeded");
02091 }
02092
02093 RemoveRunningJob(jobID);
02094 }
02095
02096 void *JobQueue::MetadataLookupThread(void *param)
02097 {
02098 JobThreadStruct *jts = (JobThreadStruct *)param;
02099 JobQueue *jq = jts->jq;
02100
02101 MThread::ThreadSetup(QString("Metadata_%1").arg(jts->jobID));
02102 jq->DoMetadataLookupThread(jts->jobID);
02103 MThread::ThreadCleanup();
02104
02105 delete jts;
02106
02107 return NULL;
02108 }
02109
02110 void JobQueue::DoMetadataLookupThread(int jobID)
02111 {
02112
02113 runningJobsLock->lock();
02114 if (!runningJobs[jobID].pginfo)
02115 {
02116 LOG(VB_JOBQUEUE, LOG_ERR, LOC +
02117 "The JobQueue cannot currently perform lookups for items which do "
02118 "not have a chanid/starttime in the recorded table.");
02119 ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
02120 RemoveRunningJob(jobID);
02121 runningJobsLock->unlock();
02122 return;
02123 }
02124
02125 ProgramInfo *program_info = runningJobs[jobID].pginfo;
02126 runningJobsLock->unlock();
02127
02128 QString detailstr = QString("%1 recorded from channel %3")
02129 .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
02130 .arg(program_info->toString(ProgramInfo::kRecordingKey));
02131 QByteArray details = detailstr.toLocal8Bit();
02132
02133 if (!MSqlQuery::testDBConnection())
02134 {
02135 QString msg = QString("Metadata Lookup failed. Could not open "
02136 "new database connection for %1. "
02137 "Program cannot be looked up.")
02138 .arg(details.constData());
02139 LOG(VB_GENERAL, LOG_ERR, LOC + msg);
02140
02141 ChangeJobStatus(jobID, JOB_ERRORED,
02142 "Could not open new database connection for "
02143 "metadata lookup.");
02144
02145 delete program_info;
02146 return;
02147 }
02148
02149 QString msg = tr("Metadata Lookup Starting");
02150 LOG(VB_GENERAL, LOG_INFO,
02151 LOC + "Metadata Lookup Starting for " + detailstr);
02152
02153 uint retVal = 0;
02154 QString path;
02155 QString command;
02156
02157 path = GetInstallPrefix() + "/bin/mythmetadatalookup";
02158 command = QString("%1 -j %2")
02159 .arg(path).arg(jobID);
02160 command += logPropagateArgs;
02161
02162 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
02163 .arg(command));
02164
02165 GetMythDB()->GetDBManager()->CloseDatabases();
02166 retVal = myth_system(command);
02167 int priority = LOG_NOTICE;
02168 QString comment;
02169
02170 runningJobsLock->lock();
02171
02172 if ((retVal == GENERIC_EXIT_DAEMONIZING_ERROR) ||
02173 (retVal == GENERIC_EXIT_CMD_NOT_FOUND))
02174 {
02175 comment = tr("Unable to find mythmetadatalookup");
02176 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02177 priority = LOG_WARNING;
02178 }
02179 else if (runningJobs[jobID].flag == JOB_STOP)
02180 {
02181 comment = tr("Aborted by user");
02182 ChangeJobStatus(jobID, JOB_ABORTED, comment);
02183 priority = LOG_WARNING;
02184 }
02185 else if (retVal == GENERIC_EXIT_NO_RECORDING_DATA)
02186 {
02187 comment = tr("Unable to open file or init decoder");
02188 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02189 priority = LOG_WARNING;
02190 }
02191 else if (retVal >= GENERIC_EXIT_NOT_OK)
02192 {
02193 comment = tr("Failed with exit status %1").arg(retVal);
02194 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02195 priority = LOG_WARNING;
02196 }
02197 else
02198 {
02199 comment = tr("Metadata Lookup Complete.");
02200 ChangeJobStatus(jobID, JOB_FINISHED, comment);
02201
02202 program_info->SendUpdateEvent();
02203 }
02204
02205 msg = tr("Metadata Lookup %1", "Job ID")
02206 .arg(StatusText(GetJobStatus(jobID)));
02207
02208 if (!comment.isEmpty())
02209 {
02210 detailstr += QString(" (%1)").arg(comment);
02211 details = detailstr.toLocal8Bit();
02212 }
02213
02214 if (priority <= LOG_WARNING)
02215 LOG(VB_GENERAL, LOG_ERR, LOC + msg + ": " + details.constData());
02216
02217 RemoveRunningJob(jobID);
02218 runningJobsLock->unlock();
02219 }
02220
02221 void *JobQueue::FlagCommercialsThread(void *param)
02222 {
02223 JobThreadStruct *jts = (JobThreadStruct *)param;
02224 JobQueue *jq = jts->jq;
02225
02226 MThread::ThreadSetup(QString("Commflag_%1").arg(jts->jobID));
02227 jq->DoFlagCommercialsThread(jts->jobID);
02228 MThread::ThreadCleanup();
02229
02230 delete jts;
02231
02232 return NULL;
02233 }
02234
02235 void JobQueue::DoFlagCommercialsThread(int jobID)
02236 {
02237
02238 runningJobsLock->lock();
02239 if (!runningJobs[jobID].pginfo)
02240 {
02241 LOG(VB_JOBQUEUE, LOG_ERR, LOC +
02242 "The JobQueue cannot currently commflag files that do not "
02243 "have a chanid/starttime in the recorded table.");
02244 ChangeJobStatus(jobID, JOB_ERRORED, "ProgramInfo data not found");
02245 RemoveRunningJob(jobID);
02246 runningJobsLock->unlock();
02247 return;
02248 }
02249
02250 ProgramInfo *program_info = runningJobs[jobID].pginfo;
02251 runningJobsLock->unlock();
02252
02253 QString detailstr = QString("%1 recorded from channel %3")
02254 .arg(program_info->toString(ProgramInfo::kTitleSubtitle))
02255 .arg(program_info->toString(ProgramInfo::kRecordingKey));
02256 QByteArray details = detailstr.toLocal8Bit();
02257
02258 if (!MSqlQuery::testDBConnection())
02259 {
02260 QString msg = QString("Commercial Detection failed. Could not open "
02261 "new database connection for %1. "
02262 "Program cannot be flagged.")
02263 .arg(details.constData());
02264 LOG(VB_GENERAL, LOG_ERR, LOC + msg);
02265
02266 ChangeJobStatus(jobID, JOB_ERRORED,
02267 "Could not open new database connection for "
02268 "commercial detector.");
02269
02270 delete program_info;
02271 return;
02272 }
02273
02274 QString msg = tr("Commercial Detection Starting");
02275 LOG(VB_GENERAL, LOG_INFO,
02276 LOC + "Commercial Detection Starting for " + detailstr);
02277
02278 uint breaksFound = 0;
02279 QString path;
02280 QString command;
02281
02282 runningJobsLock->lock();
02283 if (runningJobs[jobID].command == "mythcommflag")
02284 {
02285 path = GetInstallPrefix() + "/bin/mythcommflag";
02286 command = QString("%1 -j %2 --noprogress")
02287 .arg(path).arg(jobID);
02288 command += logPropagateArgs;
02289 }
02290 else
02291 {
02292 command = runningJobs[jobID].command;
02293 QStringList tokens = command.split(" ", QString::SkipEmptyParts);
02294 if (!tokens.empty())
02295 path = tokens[0];
02296 }
02297 runningJobsLock->unlock();
02298
02299 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
02300 .arg(command));
02301
02302 GetMythDB()->GetDBManager()->CloseDatabases();
02303 breaksFound = myth_system(command, kMSLowExitVal);
02304 int priority = LOG_NOTICE;
02305 QString comment;
02306
02307 runningJobsLock->lock();
02308
02309 if ((breaksFound == GENERIC_EXIT_DAEMONIZING_ERROR) ||
02310 (breaksFound == GENERIC_EXIT_CMD_NOT_FOUND))
02311 {
02312 comment = tr("Unable to find mythcommflag");
02313 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02314 priority = LOG_WARNING;
02315 }
02316 else if (runningJobs[jobID].flag == JOB_STOP)
02317 {
02318 comment = tr("Aborted by user");
02319 ChangeJobStatus(jobID, JOB_ABORTED, comment);
02320 priority = LOG_WARNING;
02321 }
02322 else if (breaksFound == GENERIC_EXIT_NO_RECORDING_DATA)
02323 {
02324 comment = tr("Unable to open file or init decoder");
02325 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02326 priority = LOG_WARNING;
02327 }
02328 else if (breaksFound >= GENERIC_EXIT_NOT_OK)
02329 {
02330 comment = tr("Failed with exit status %1").arg(breaksFound);
02331 ChangeJobStatus(jobID, JOB_ERRORED, comment);
02332 priority = LOG_WARNING;
02333 }
02334 else
02335 {
02336 comment = tr("%n commercial break(s)", "", breaksFound);
02337 ChangeJobStatus(jobID, JOB_FINISHED, comment);
02338
02339 program_info->SendUpdateEvent();
02340
02341 if (!program_info->IsLocal())
02342 program_info->SetPathname(program_info->GetPlaybackURL(false,true));
02343 if (program_info->IsLocal())
02344 {
02345 PreviewGenerator *pg = new PreviewGenerator(
02346 program_info, QString(), PreviewGenerator::kLocal);
02347 pg->Run();
02348 pg->deleteLater();
02349 }
02350 }
02351
02352 msg = tr("Commercial Detection %1", "Job ID")
02353 .arg(StatusText(GetJobStatus(jobID)));
02354
02355 if (!comment.isEmpty())
02356 {
02357 detailstr += QString(" (%1)").arg(comment);
02358 details = detailstr.toLocal8Bit();
02359 }
02360
02361 if (priority <= LOG_WARNING)
02362 LOG(VB_GENERAL, LOG_ERR, LOC + msg + ": " + details.constData());
02363
02364 RemoveRunningJob(jobID);
02365 runningJobsLock->unlock();
02366 }
02367
02368 void *JobQueue::UserJobThread(void *param)
02369 {
02370 JobThreadStruct *jts = (JobThreadStruct *)param;
02371 JobQueue *jq = jts->jq;
02372
02373 MThread::ThreadSetup(QString("UserJob_%1").arg(jts->jobID));
02374 jq->DoUserJobThread(jts->jobID);
02375 MThread::ThreadCleanup();
02376
02377 delete jts;
02378
02379 return NULL;
02380 }
02381
02382 void JobQueue::DoUserJobThread(int jobID)
02383 {
02384 runningJobsLock->lock();
02385 ProgramInfo *pginfo = runningJobs[jobID].pginfo;
02386 QString jobDesc = runningJobs[jobID].desc;
02387 QString command = runningJobs[jobID].command;
02388 runningJobsLock->unlock();
02389
02390 ChangeJobStatus(jobID, JOB_RUNNING);
02391
02392 QString msg;
02393
02394 if (pginfo)
02395 {
02396 msg = QString("Started %1 for %2 recorded from channel %3")
02397 .arg(jobDesc)
02398 .arg(pginfo->toString(ProgramInfo::kTitleSubtitle))
02399 .arg(pginfo->toString(ProgramInfo::kRecordingKey));
02400 }
02401 else
02402 msg = QString("Started %1 for jobID %2").arg(jobDesc).arg(jobID);
02403
02404 LOG(VB_GENERAL, LOG_INFO, LOC + QString(msg.toLocal8Bit().constData()));
02405
02406 switch (jobQueueCPU)
02407 {
02408 case 0: myth_nice(17);
02409 myth_ioprio(8);
02410 break;
02411 case 1: myth_nice(10);
02412 myth_ioprio(7);
02413 break;
02414 case 2:
02415 default: break;
02416 }
02417
02418 LOG(VB_JOBQUEUE, LOG_INFO, LOC + QString("Running command: '%1'")
02419 .arg(command));
02420 GetMythDB()->GetDBManager()->CloseDatabases();
02421 uint result = myth_system(command);
02422
02423 if ((result == GENERIC_EXIT_DAEMONIZING_ERROR) ||
02424 (result == GENERIC_EXIT_CMD_NOT_FOUND))
02425 {
02426 msg = QString("User Job '%1' failed, unable to find "
02427 "executable, check your PATH and backend logs.")
02428 .arg(command);
02429 LOG(VB_GENERAL, LOG_ERR, LOC + msg);
02430 LOG(VB_GENERAL, LOG_NOTICE, LOC + QString("Current PATH: '%1'")
02431 .arg(getenv("PATH")));
02432
02433 ChangeJobStatus(jobID, JOB_ERRORED,
02434 "ERROR: Unable to find executable, check backend logs.");
02435 }
02436 else if (result != 0)
02437 {
02438 msg = QString("User Job '%1' failed.").arg(command);
02439 LOG(VB_GENERAL, LOG_ERR, LOC + msg);
02440
02441 ChangeJobStatus(jobID, JOB_ERRORED,
02442 "ERROR: User Job returned non-zero, check logs.");
02443 }
02444 else
02445 {
02446 if (pginfo)
02447 {
02448 msg = QString("Finished %1 for %2 recorded from channel %3")
02449 .arg(jobDesc)
02450 .arg(pginfo->toString(ProgramInfo::kTitleSubtitle))
02451 .arg(pginfo->toString(ProgramInfo::kRecordingKey));
02452 }
02453 else
02454 msg = QString("Finished %1 for jobID %2").arg(jobDesc).arg(jobID);
02455
02456 LOG(VB_GENERAL, LOG_INFO, LOC + QString(msg.toLocal8Bit().constData()));
02457
02458 ChangeJobStatus(jobID, JOB_FINISHED, "Successfully Completed.");
02459
02460 if (pginfo)
02461 pginfo->SendUpdateEvent();
02462 }
02463
02464 RemoveRunningJob(jobID);
02465 }
02466
02467 int JobQueue::UserJobTypeToIndex(int jobType)
02468 {
02469 if (jobType & JOB_USERJOB)
02470 {
02471 int x = ((jobType & JOB_USERJOB)>> 8);
02472 int bits = 1;
02473 while ((x != 0) && ((x & 0x01) == 0))
02474 {
02475 bits++;
02476 x = x >> 1;
02477 }
02478 if ( bits > 4 )
02479 return JOB_NONE;
02480
02481 return bits;
02482 }
02483 return JOB_NONE;
02484 }
02485
02486