00001 #include <algorithm>
00002 using namespace std;
00003
00004 #include "DeviceReadBuffer.h"
00005 #include "mythcorecontext.h"
00006 #include "mythbaseutil.h"
00007 #include "mythlogging.h"
00008 #include "tspacket.h"
00009 #include "mthread.h"
00010 #include "compat.h"
00011
00012 #ifndef USING_MINGW
00013 #include <sys/poll.h>
00014 #endif
00015
00017 #define REPORT_RING_STATS 0
00018
00019 #define LOC QString("DevRdB(%1): ").arg(videodevice)
00020
00021 DeviceReadBuffer::DeviceReadBuffer(
00022 DeviceReaderCB *cb, bool use_poll, bool error_exit_on_poll_timeout)
00023 : MThread("DeviceReadBuffer"),
00024 videodevice(""), _stream_fd(-1),
00025 readerCB(cb),
00026
00027
00028 dorun(false),
00029 eof(false), error(false),
00030 request_pause(false), paused(false),
00031 using_poll(use_poll),
00032 poll_timeout_is_error(error_exit_on_poll_timeout),
00033 max_poll_wait(2500 ),
00034
00035 size(0), used(0),
00036 read_quanta(0),
00037 dev_read_size(0), min_read(0),
00038
00039 buffer(NULL), readPtr(NULL),
00040 writePtr(NULL), endPtr(NULL),
00041
00042
00043 max_used(0), avg_used(0),
00044 avg_cnt(0)
00045 {
00046 for (int i = 0; i < 2; i++)
00047 {
00048 wake_pipe[i] = -1;
00049 wake_pipe_flags[i] = 0;
00050 }
00051
00052 #if defined( USING_MINGW ) && !defined( _MSC_VER )
00053 #warning mingw DeviceReadBuffer::Poll
00054 if (using_poll)
00055 {
00056 LOG(VB_GENERAL, LOG_WARNING, LOC +
00057 "mingw DeviceReadBuffer::Poll is not implemented");
00058 using_poll = false;
00059 }
00060 #endif
00061 }
00062
00063 DeviceReadBuffer::~DeviceReadBuffer()
00064 {
00065 Stop();
00066 if (buffer)
00067 {
00068 delete[] buffer;
00069 buffer = NULL;
00070 }
00071 }
00072
00073 bool DeviceReadBuffer::Setup(const QString &streamName, int streamfd,
00074 uint readQuanta, uint deviceBufferSize)
00075 {
00076 QMutexLocker locker(&lock);
00077
00078 if (buffer)
00079 delete[] buffer;
00080
00081 videodevice = streamName;
00082 videodevice = (videodevice == QString::null) ? "" : videodevice;
00083 _stream_fd = streamfd;
00084
00085
00086 eof = false;
00087 error = false;
00088 request_pause = false;
00089 paused = false;
00090
00091 read_quanta = (readQuanta) ? readQuanta : read_quanta;
00092 size = gCoreContext->GetNumSetting(
00093 "HDRingbufferSize", 50 * read_quanta) * 1024;
00094 used = 0;
00095 dev_read_size = read_quanta * (using_poll ? 256 : 48);
00096 dev_read_size = (deviceBufferSize) ?
00097 min(dev_read_size, (size_t)deviceBufferSize) : dev_read_size;
00098 min_read = read_quanta * 4;
00099
00100 buffer = new unsigned char[size + dev_read_size];
00101 readPtr = buffer;
00102 writePtr = buffer;
00103 endPtr = buffer + size;
00104
00105
00106 if (!buffer)
00107 {
00108 LOG(VB_GENERAL, LOG_ERR, LOC +
00109 QString("Failed to allocate buffer of size %1 = %2 + %3")
00110 .arg(size+dev_read_size).arg(size).arg(dev_read_size));
00111 return false;
00112 }
00113 memset(buffer, 0xFF, size + read_quanta);
00114
00115
00116 max_used = 0;
00117 avg_used = 0;
00118 avg_cnt = 0;
00119 lastReport.start();
00120
00121 LOG(VB_RECORD, LOG_INFO, LOC + QString("buffer size %1 KB").arg(size/1024));
00122
00123 return true;
00124 }
00125
00126 void DeviceReadBuffer::Start(void)
00127 {
00128 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- begin");
00129
00130 QMutexLocker locker(&lock);
00131 if (isRunning() || dorun)
00132 {
00133 dorun = false;
00134 locker.unlock();
00135 WakePoll();
00136 wait();
00137 locker.relock();
00138 }
00139
00140 dorun = true;
00141 error = false;
00142 eof = false;
00143
00144 start();
00145
00146 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- middle");
00147
00148 while (dorun && !isRunning())
00149 runWait.wait(locker.mutex(), 100);
00150
00151 LOG(VB_RECORD, LOG_INFO, LOC + "Start() -- end");
00152 }
00153
00154 void DeviceReadBuffer::Reset(const QString &streamName, int streamfd)
00155 {
00156 QMutexLocker locker(&lock);
00157
00158 videodevice = streamName;
00159 videodevice = (videodevice == QString::null) ? "" : videodevice;
00160 _stream_fd = streamfd;
00161
00162 used = 0;
00163 readPtr = buffer;
00164 writePtr = buffer;
00165
00166 error = false;
00167 }
00168
00169 void DeviceReadBuffer::Stop(void)
00170 {
00171 LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- begin");
00172 QMutexLocker locker(&lock);
00173 if (isRunning() || dorun)
00174 {
00175 dorun = false;
00176 locker.unlock();
00177 WakePoll();
00178 wait();
00179 }
00180 LOG(VB_RECORD, LOG_INFO, LOC + "Stop() -- end");
00181 }
00182
00183 void DeviceReadBuffer::SetRequestPause(bool req)
00184 {
00185 QMutexLocker locker(&lock);
00186 request_pause = req;
00187 WakePoll();
00188 }
00189
00190 void DeviceReadBuffer::SetPaused(bool val)
00191 {
00192 QMutexLocker locker(&lock);
00193 paused = val;
00194 if (val)
00195 pauseWait.wakeAll();
00196 else
00197 unpauseWait.wakeAll();
00198 }
00199
00200
00201 void DeviceReadBuffer::WakePoll(void) const
00202 {
00203 char buf[1];
00204 buf[0] = '0';
00205 ssize_t wret = 0;
00206 while (isRunning() && (wret <= 0) && (wake_pipe[1] >= 0))
00207 {
00208 wret = ::write(wake_pipe[1], &buf, 1);
00209 if ((wret < 0) && (EAGAIN != errno) && (EINTR != errno))
00210 {
00211 LOG(VB_GENERAL, LOG_ERR, LOC + "WakePoll failed.");
00212 ClosePipes();
00213 break;
00214 }
00215 }
00216 }
00217
00218 void DeviceReadBuffer::ClosePipes(void) const
00219 {
00220 for (uint i = 0; i < 2; i++)
00221 {
00222 if (wake_pipe[i] >= 0)
00223 {
00224 ::close(wake_pipe[i]);
00225 wake_pipe[i] = -1;
00226 wake_pipe_flags[i] = 0;
00227 }
00228 }
00229 }
00230
00231 bool DeviceReadBuffer::IsPaused(void) const
00232 {
00233 QMutexLocker locker(&lock);
00234 return paused;
00235 }
00236
00237 bool DeviceReadBuffer::WaitForPaused(unsigned long timeout)
00238 {
00239 QMutexLocker locker(&lock);
00240
00241 if (!paused)
00242 pauseWait.wait(&lock, timeout);
00243
00244 return paused;
00245 }
00246
00247 bool DeviceReadBuffer::WaitForUnpause(unsigned long timeout)
00248 {
00249 QMutexLocker locker(&lock);
00250
00251 if (paused)
00252 unpauseWait.wait(&lock, timeout);
00253
00254 return paused;
00255 }
00256
00257 bool DeviceReadBuffer::IsPauseRequested(void) const
00258 {
00259 QMutexLocker locker(&lock);
00260 return request_pause;
00261 }
00262
00263 bool DeviceReadBuffer::IsErrored(void) const
00264 {
00265 QMutexLocker locker(&lock);
00266 return error;
00267 }
00268
00269 bool DeviceReadBuffer::IsEOF(void) const
00270 {
00271 QMutexLocker locker(&lock);
00272 return eof;
00273 }
00274
00275 bool DeviceReadBuffer::IsRunning(void) const
00276 {
00277 QMutexLocker locker(&lock);
00278 return isRunning();
00279 }
00280
00281 uint DeviceReadBuffer::GetUnused(void) const
00282 {
00283 QMutexLocker locker(&lock);
00284 return size - used;
00285 }
00286
00287 uint DeviceReadBuffer::GetUsed(void) const
00288 {
00289 QMutexLocker locker(&lock);
00290 return used;
00291 }
00292
00293 uint DeviceReadBuffer::GetContiguousUnused(void) const
00294 {
00295 QMutexLocker locker(&lock);
00296 return endPtr - writePtr;
00297 }
00298
00299 void DeviceReadBuffer::IncrWritePointer(uint len)
00300 {
00301 QMutexLocker locker(&lock);
00302 used += len;
00303 writePtr += len;
00304 writePtr = (writePtr >= endPtr) ? buffer + (writePtr - endPtr) : writePtr;
00305 #if REPORT_RING_STATS
00306 max_used = max(used, max_used);
00307 avg_used = ((avg_used * avg_cnt) + used) / ++avg_cnt;
00308 #endif
00309 dataWait.wakeAll();
00310 }
00311
00312 void DeviceReadBuffer::IncrReadPointer(uint len)
00313 {
00314 QMutexLocker locker(&lock);
00315 used -= len;
00316 readPtr += len;
00317 readPtr = (readPtr == endPtr) ? buffer : readPtr;
00318 }
00319
00320 void DeviceReadBuffer::run(void)
00321 {
00322 RunProlog();
00323
00324 uint errcnt = 0;
00325
00326 lock.lock();
00327 runWait.wakeAll();
00328 lock.unlock();
00329
00330 if (using_poll)
00331 setup_pipe(wake_pipe, wake_pipe_flags);
00332
00333 while (dorun)
00334 {
00335 if (!HandlePausing())
00336 continue;
00337
00338 if (!IsOpen())
00339 {
00340 usleep(5000);
00341 continue;
00342 }
00343
00344 if (using_poll && !Poll())
00345 continue;
00346
00347 {
00348 QMutexLocker locker(&lock);
00349 if (error)
00350 {
00351 LOG(VB_RECORD, LOG_ERR, LOC + "fill_ringbuffer: error state");
00352 break;
00353 }
00354 }
00355
00356
00357 size_t unused = (size_t) WaitForUnused(read_quanta);
00358 size_t read_size = min(dev_read_size, unused);
00359
00360
00361 if (read_size)
00362 {
00363 ssize_t len = read(_stream_fd, writePtr, read_size);
00364 if (!CheckForErrors(len, read_size, errcnt))
00365 {
00366 if (errcnt > 5)
00367 break;
00368 else
00369 continue;
00370 }
00371 errcnt = 0;
00372
00373 if (writePtr + len > endPtr)
00374 memcpy(buffer, endPtr, writePtr + len - endPtr);
00375 IncrWritePointer(len);
00376 }
00377 }
00378
00379 ClosePipes();
00380
00381 lock.lock();
00382 eof = true;
00383 runWait.wakeAll();
00384 dataWait.wakeAll();
00385 pauseWait.wakeAll();
00386 unpauseWait.wakeAll();
00387 lock.unlock();
00388
00389 RunEpilog();
00390 }
00391
00392 bool DeviceReadBuffer::HandlePausing(void)
00393 {
00394 if (IsPauseRequested())
00395 {
00396 SetPaused(true);
00397
00398 if (readerCB)
00399 readerCB->ReaderPaused(_stream_fd);
00400
00401 usleep(5000);
00402 return false;
00403 }
00404 else if (IsPaused())
00405 {
00406 Reset(videodevice, _stream_fd);
00407 SetPaused(false);
00408 }
00409 return true;
00410 }
00411
00412 bool DeviceReadBuffer::Poll(void) const
00413 {
00414 #ifdef USING_MINGW
00415 # ifdef _MSC_VER
00416 # pragma message( "mingw DeviceReadBuffer::Poll" )
00417 # else
00418 # warning mingw DeviceReadBuffer::Poll
00419 # endif
00420 LOG(VB_GENERAL, LOG_ERR, LOC +
00421 "mingw DeviceReadBuffer::Poll is not implemented");
00422 return false;
00423 #else
00424 bool retval = true;
00425 MythTimer timer;
00426 timer.start();
00427
00428 int poll_cnt = 1;
00429 struct pollfd polls[2];
00430 memset(polls, 0, sizeof(polls));
00431
00432 polls[0].fd = _stream_fd;
00433 polls[0].events = POLLIN | POLLPRI;
00434 polls[0].revents = 0;
00435
00436 if (wake_pipe[0] >= 0)
00437 {
00438 poll_cnt = 2;
00439 polls[1].fd = wake_pipe[0];
00440 polls[1].events = POLLIN;
00441 polls[1].revents = 0;
00442 }
00443
00444 while (true)
00445 {
00446 polls[0].revents = 0;
00447 polls[1].revents = 0;
00448 poll_cnt = (wake_pipe[0] >= 0) ? poll_cnt : 1;
00449
00450 int timeout = max_poll_wait;
00451 if (1 == poll_cnt)
00452 timeout = 10;
00453 else if (poll_timeout_is_error)
00454 timeout = max((int)max_poll_wait - timer.elapsed(), 10);
00455
00456 int ret = poll(polls, poll_cnt, timeout);
00457
00458 if (polls[0].revents & (POLLHUP | POLLNVAL))
00459 {
00460 LOG(VB_GENERAL, LOG_ERR, LOC + "poll error");
00461 error = true;
00462 return true;
00463 }
00464
00465 if (!dorun || !IsOpen() || IsPauseRequested())
00466 {
00467 retval = false;
00468 break;
00469 }
00470
00471 if (polls[0].revents & POLLPRI)
00472 {
00473 readerCB->PriorityEvent(polls[0].fd);
00474 }
00475
00476 if (polls[0].revents & POLLIN)
00477 {
00478 if (ret > 0)
00479 break;
00480 else if (ret < 0)
00481 {
00482 if ((EOVERFLOW == errno))
00483 break;
00484
00485 if ((EAGAIN == errno) || (EINTR == errno))
00486 continue;
00487
00488 usleep(2500 );
00489 }
00490 else
00491 {
00492 if (poll_timeout_is_error &&
00493 (timer.elapsed() >= (int)max_poll_wait))
00494 {
00495 LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 1");
00496 QMutexLocker locker(&lock);
00497 error = true;
00498 return true;
00499 }
00500 }
00501 }
00502
00503
00504 if ((poll_cnt > 1) && (polls[1].revents & POLLIN))
00505 {
00506 char dummy[128];
00507 int cnt = (wake_pipe_flags[0] & O_NONBLOCK) ? 128 : 1;
00508 cnt = ::read(wake_pipe[0], dummy, cnt);
00509 }
00510
00511 if (poll_timeout_is_error && (timer.elapsed() >= (int)max_poll_wait))
00512 {
00513 LOG(VB_GENERAL, LOG_ERR, LOC + "Poll giving up 2");
00514 QMutexLocker locker(&lock);
00515 error = true;
00516 return true;
00517 }
00518 }
00519
00520 int e = timer.elapsed();
00521 if (e > (int)max_poll_wait)
00522 {
00523 LOG(VB_GENERAL, LOG_WARNING, LOC +
00524 QString("Poll took an unusually long time %1 ms")
00525 .arg(timer.elapsed()));
00526 }
00527
00528 return retval;
00529 #endif //!USING_MINGW
00530 }
00531
00532 bool DeviceReadBuffer::CheckForErrors(
00533 ssize_t len, size_t requested_len, uint &errcnt)
00534 {
00535 if (len > (ssize_t)requested_len)
00536 {
00537 LOG(VB_GENERAL, LOG_ERR, LOC +
00538 "Driver is returning bogus values on read");
00539 if (++errcnt > 5)
00540 {
00541 LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
00542 QMutexLocker locker(&lock);
00543 error = true;
00544 }
00545 return false;
00546 }
00547
00548 #ifdef USING_MINGW
00549 # ifdef _MSC_VER
00550 # pragma message( "mingw DeviceReadBuffer::CheckForErrors" )
00551 # else
00552 # warning mingw DeviceReadBuffer::CheckForErrors
00553 # endif
00554 LOG(VB_GENERAL, LOG_ERR, LOC +
00555 "mingw DeviceReadBuffer::CheckForErrors is not implemented");
00556 return false;
00557 #else
00558 if (len < 0)
00559 {
00560 if (EINTR == errno)
00561 return false;
00562 if (EAGAIN == errno)
00563 {
00564 usleep(2500);
00565 return false;
00566 }
00567 if (EOVERFLOW == errno)
00568 {
00569 LOG(VB_GENERAL, LOG_ERR, LOC + "Driver buffers overflowed");
00570 return false;
00571 }
00572
00573 LOG(VB_GENERAL, LOG_ERR, LOC +
00574 QString("Problem reading fd(%1)").arg(_stream_fd) + ENO);
00575
00576 if (++errcnt > 5)
00577 {
00578 LOG(VB_RECORD, LOG_ERR, LOC + "Too many errors.");
00579 QMutexLocker locker(&lock);
00580 error = true;
00581 return false;
00582 }
00583
00584 usleep(500);
00585 return false;
00586 }
00587 else if (len == 0)
00588 {
00589 if (++errcnt > 5)
00590 {
00591 LOG(VB_GENERAL, LOG_ERR, LOC +
00592 QString("End-Of-File? fd(%1)").arg(_stream_fd));
00593
00594 lock.lock();
00595 eof = true;
00596 lock.unlock();
00597
00598 return false;
00599 }
00600 usleep(500);
00601 return false;
00602 }
00603 return true;
00604 #endif
00605 }
00606
00613 uint DeviceReadBuffer::Read(unsigned char *buf, const uint count)
00614 {
00615 uint avail = WaitForUsed(min(count, (uint)min_read), 500);
00616 size_t cnt = min(count, avail);
00617
00618 if (!cnt)
00619 return 0;
00620
00621 if (readPtr + cnt > endPtr)
00622 {
00623
00624 size_t len = endPtr - readPtr;
00625 if (len)
00626 {
00627 memcpy(buf, readPtr, len);
00628 buf += len;
00629 IncrReadPointer(len);
00630 }
00631 if (cnt > len)
00632 {
00633 len = cnt - len;
00634 memcpy(buf, readPtr, len);
00635 IncrReadPointer(len);
00636 }
00637 }
00638 else
00639 {
00640 memcpy(buf, readPtr, cnt);
00641 IncrReadPointer(cnt);
00642 }
00643
00644 #if REPORT_RING_STATS
00645 ReportStats();
00646 #endif
00647
00648 return cnt;
00649 }
00650
00655 uint DeviceReadBuffer::WaitForUnused(uint needed) const
00656 {
00657 size_t unused = GetUnused();
00658
00659 if (unused > read_quanta)
00660 {
00661 while (unused < needed)
00662 {
00663 unused = GetUnused();
00664 if (IsPauseRequested() || !IsOpen() || !dorun)
00665 return 0;
00666 usleep(5000);
00667 }
00668 if (IsPauseRequested() || !IsOpen() || !dorun)
00669 return 0;
00670 unused = GetUnused();
00671 }
00672
00673 return unused;
00674 }
00675
00681 uint DeviceReadBuffer::WaitForUsed(uint needed, uint max_wait) const
00682 {
00683 MythTimer timer;
00684 timer.start();
00685
00686 QMutexLocker locker(&lock);
00687 size_t avail = used;
00688 while ((needed > avail) && isRunning() &&
00689 !request_pause && !error && !eof &&
00690 (timer.elapsed() < (int)max_wait))
00691 {
00692 dataWait.wait(locker.mutex(), 10);
00693 avail = used;
00694 }
00695 return avail;
00696 }
00697
00698 void DeviceReadBuffer::ReportStats(void)
00699 {
00700 #if REPORT_RING_STATS
00701 if (lastReport.elapsed() > 20*1000 )
00702 {
00703 QMutexLocker locker(&lock);
00704 double rsize = 100.0 / size;
00705 QString msg = QString("fill avg(%1%) ").arg(avg_used*rsize,3,'f',0);
00706 msg += QString("fill max(%2%) ").arg(max_used*rsize,3,'f',0);
00707 msg += QString("samples(%3)").arg(avg_cnt);
00708
00709 avg_used = 0;
00710 avg_cnt = 0;
00711 max_used = 0;
00712 lastReport.start();
00713
00714 LOG(VB_GENERAL, LOG_DEBUG, LOC + msg);
00715 }
00716 #endif
00717 }
00718
00719
00720
00721