00001
00002 #include <cmath>
00003 #include <cstdio>
00004 #include <cstdlib>
00005 #include <cerrno>
00006
00007
00008 #include <sys/types.h>
00009 #include <sys/time.h>
00010 #include <unistd.h>
00011 #include <fcntl.h>
00012
00013
00014 #include <QFile>
00015 #include <QDateTime>
00016
00017 #include "ThreadedFileWriter.h"
00018 #include "fileringbuffer.h"
00019 #include "streamingringbuffer.h"
00020 #include "livetvchain.h"
00021 #include "mythcontext.h"
00022 #include "ringbuffer.h"
00023 #include "mythconfig.h"
00024 #include "remotefile.h"
00025 #include "compat.h"
00026 #include "mythmiscutil.h"
00027 #include "mythlogging.h"
00028 #include "DVD/dvdringbuffer.h"
00029 #include "Bluray/bdringbuffer.h"
00030 #include "HLS/httplivestreambuffer.h"
00031
00032
00033 #define BUFFER_SIZE_MINIMUM 4 * 1024 * 1024
00034 #define BUFFER_FACTOR_NETWORK 2
00035 #define BUFFER_FACTOR_BITRATE 2
00036 #define BUFFER_FACTOR_MATROSKA 2
00037
00038 const int RingBuffer::kDefaultOpenTimeout = 2000;
00039 const int RingBuffer::kLiveTVOpenTimeout = 10000;
00040
00041 #define CHUNK 32768
00042
00043 #define LOC QString("RingBuf(%1): ").arg(filename)
00044
00045 QMutex RingBuffer::subExtLock;
00046 QStringList RingBuffer::subExt;
00047 QStringList RingBuffer::subExtNoCheck;
00048
00049
00050
00051
00052
00053
00054
00055
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069
00070
00071
00072
00099 RingBuffer *RingBuffer::Create(
00100 const QString &xfilename, bool write,
00101 bool usereadahead, int timeout_ms, bool stream_only)
00102 {
00103 QString lfilename = xfilename;
00104 QString lower = lfilename.toLower();
00105
00106 if (write)
00107 return new FileRingBuffer(lfilename, write, usereadahead, timeout_ms);
00108
00109 bool dvddir = false;
00110 bool bddir = false;
00111 bool httpurl = lower.startsWith("http://") || lower.startsWith("https://");
00112 bool mythurl = lower.startsWith("myth://");
00113 bool bdurl = lower.startsWith("bd:");
00114 bool dvdurl = lower.startsWith("dvd:");
00115 bool dvdext = lower.endsWith(".img") || lower.endsWith(".iso");
00116
00117 if (httpurl)
00118 {
00119 if (HLSRingBuffer::TestForHTTPLiveStreaming(lfilename))
00120 {
00121 return new HLSRingBuffer(lfilename);
00122 }
00123 return new StreamingRingBuffer(lfilename);
00124 }
00125 if (!stream_only && mythurl)
00126 {
00127 struct stat fileInfo;
00128 if ((RemoteFile::Exists(lfilename, &fileInfo)) &&
00129 (S_ISDIR(fileInfo.st_mode)))
00130 {
00131 if (RemoteFile::Exists(lfilename + "/VIDEO_TS"))
00132 dvddir = true;
00133 else if (RemoteFile::Exists(lfilename + "/BDMV"))
00134 bddir = true;
00135 }
00136 }
00137 else if (!stream_only && !mythurl)
00138 {
00139 if (QFile::exists(lfilename + "/VIDEO_TS"))
00140 dvddir = true;
00141 else if (QFile::exists(lfilename + "/BDMV"))
00142 bddir = true;
00143 }
00144
00145 if (!stream_only && (dvdurl || dvddir || dvdext))
00146 {
00147 if (lfilename.left(4) == "dvd:")
00148 lfilename.remove(0,4);
00149
00150 if (!(mythurl || QFile::exists(lfilename)))
00151 lfilename = "/dev/dvd";
00152 LOG(VB_PLAYBACK, LOG_INFO, "Trying DVD at " + lfilename);
00153
00154 return new DVDRingBuffer(lfilename);
00155 }
00156 else if (!stream_only && (bdurl || bddir))
00157 {
00158 if (lfilename.left(3) == "bd:")
00159 lfilename.remove(0,3);
00160
00161 if (!(mythurl || QFile::exists(lfilename)))
00162 lfilename = "/dev/dvd";
00163 LOG(VB_PLAYBACK, LOG_INFO, "Trying BD at " + lfilename);
00164
00165 return new BDRingBuffer(lfilename);
00166 }
00167
00168 return new FileRingBuffer(
00169 lfilename, write, usereadahead, timeout_ms);
00170 }
00171
00172 RingBuffer::RingBuffer(RingBufferType rbtype) :
00173 MThread("RingBuffer"),
00174 type(rbtype),
00175 readpos(0), writepos(0),
00176 internalreadpos(0), ignorereadpos(-1),
00177 rbrpos(0), rbwpos(0),
00178 stopreads(false), safefilename(QString()),
00179 filename(), subtitlefilename(),
00180 tfw(NULL), fd2(-1),
00181 writemode(false), remotefile(NULL),
00182 bufferSize(BUFFER_SIZE_MINIMUM),
00183 low_buffers(false),
00184 fileismatroska(false), unknownbitrate(false),
00185 startreadahead(false), readAheadBuffer(NULL),
00186 readaheadrunning(false), reallyrunning(false),
00187 request_pause(false), paused(false),
00188 ateof(false), readsallowed(false),
00189 setswitchtonext(false),
00190 rawbitrate(8000), playspeed(1.0f),
00191 fill_threshold(65536), fill_min(-1),
00192 readblocksize(CHUNK), wanttoread(0),
00193 numfailures(0), commserror(false),
00194 oldfile(false), livetvchain(NULL),
00195 ignoreliveeof(false), readAdjust(0),
00196 bitrateMonitorEnabled(false)
00197 {
00198 {
00199 QMutexLocker locker(&subExtLock);
00200 if (subExt.empty())
00201 {
00202
00203 subExt += ".srt";
00204 subExt += ".sub";
00205 subExt += ".txt";
00206
00207
00208 subExtNoCheck = subExt;
00209 subExtNoCheck += ".gif";
00210 subExtNoCheck += ".png";
00211 }
00212 }
00213 }
00214
00218 RingBuffer::~RingBuffer(void)
00219 {
00220 KillReadAheadThread();
00221
00222 rwlock.lockForWrite();
00223
00224 if (readAheadBuffer)
00225 {
00226 delete [] readAheadBuffer;
00227 readAheadBuffer = NULL;
00228 }
00229
00230 if (tfw)
00231 {
00232 tfw->Flush();
00233 delete tfw;
00234 tfw = NULL;
00235 }
00236
00237 rwlock.unlock();
00238
00239 wait();
00240 }
00241
00245 void RingBuffer::Reset(bool full, bool toAdjust, bool resetInternal)
00246 {
00247 LOG(VB_FILE, LOG_INFO, LOC + QString("Reset(%1,%2,%3)")
00248 .arg(full).arg(toAdjust).arg(resetInternal));
00249
00250 rwlock.lockForWrite();
00251 poslock.lockForWrite();
00252
00253 numfailures = 0;
00254 commserror = false;
00255 setswitchtonext = false;
00256
00257 writepos = 0;
00258 readpos = (toAdjust) ? (readpos - readAdjust) : 0;
00259
00260 if (readpos != 0)
00261 {
00262 LOG(VB_GENERAL, LOG_ERR, LOC +
00263 QString("RingBuffer::Reset() nonzero readpos. toAdjust: %1 "
00264 "readpos: %2 readAdjust: %3")
00265 .arg(toAdjust).arg(readpos).arg(readAdjust));
00266 }
00267
00268 readAdjust = 0;
00269 readpos = (readpos < 0) ? 0 : readpos;
00270
00271 if (full)
00272 ResetReadAhead(readpos);
00273
00274 if (resetInternal)
00275 internalreadpos = readpos;
00276
00277 generalWait.wakeAll();
00278 poslock.unlock();
00279 rwlock.unlock();
00280 }
00281
00287 void RingBuffer::UpdateRawBitrate(uint raw_bitrate)
00288 {
00289 LOG(VB_FILE, LOG_INFO, LOC +
00290 QString("UpdateRawBitrate(%1Kb)").arg(raw_bitrate));
00291
00292
00293
00294 if (raw_bitrate < 64)
00295 {
00296 LOG(VB_FILE, LOG_INFO, LOC +
00297 QString("Bitrate too low - setting to 64Kb"));
00298 raw_bitrate = 64;
00299 }
00300
00301 rwlock.lockForWrite();
00302 rawbitrate = raw_bitrate;
00303 CalcReadAheadThresh();
00304 rwlock.unlock();
00305 }
00306
00311 void RingBuffer::UpdatePlaySpeed(float play_speed)
00312 {
00313 rwlock.lockForWrite();
00314 playspeed = play_speed;
00315 CalcReadAheadThresh();
00316 rwlock.unlock();
00317 }
00318
00324 void RingBuffer::SetBufferSizeFactors(bool estbitrate, bool matroska)
00325 {
00326 rwlock.lockForWrite();
00327 unknownbitrate = estbitrate;
00328 fileismatroska = matroska;
00329 rwlock.unlock();
00330 CreateReadAheadBuffer();
00331 }
00332
00340 void RingBuffer::CalcReadAheadThresh(void)
00341 {
00342 uint estbitrate = 0;
00343
00344 readsallowed = false;
00345 readblocksize = max(readblocksize, CHUNK);
00346
00347
00348 fill_threshold = 7 * bufferSize / 8;
00349
00350 const uint KB2 = 2*1024;
00351 const uint KB4 = 4*1024;
00352 const uint KB8 = 8*1024;
00353 const uint KB32 = 32*1024;
00354 const uint KB64 = 64*1024;
00355 const uint KB128 = 128*1024;
00356 const uint KB256 = 256*1024;
00357 const uint KB512 = 512*1024;
00358
00359 estbitrate = (uint) max(abs(rawbitrate * playspeed),
00360 0.5f * rawbitrate);
00361 estbitrate = min(rawbitrate * 3, estbitrate);
00362 int const rbs = (estbitrate > 18000) ? KB512 :
00363 (estbitrate > 9000) ? KB256 :
00364 (estbitrate > 5000) ? KB128 :
00365 (estbitrate > 2500) ? KB64 :
00366 (estbitrate >= 500) ? KB32 :
00367 (estbitrate > 250) ? KB8 :
00368 (estbitrate > 125) ? KB4 : KB2;
00369 if (rbs < CHUNK)
00370 readblocksize = rbs;
00371 else
00372 readblocksize = max(rbs,readblocksize);
00373
00374
00375 float secs_min = 0.25;
00376
00377 fill_min = (uint) ((estbitrate * secs_min) * 0.125f);
00378
00379 if (fill_min >= CHUNK || rbs >= CHUNK)
00380 {
00381 if (low_buffers)
00382 {
00383 LOG(VB_GENERAL, LOG_INFO, LOC +
00384 "Buffering optimisations disabled.");
00385 }
00386 low_buffers = false;
00387 fill_min = ((fill_min / CHUNK) + 1) * CHUNK;
00388 }
00389 else
00390 {
00391 low_buffers = true;
00392 LOG(VB_GENERAL, LOG_WARNING, "Enabling buffering optimisations "
00393 "for low bitrate stream.");
00394 }
00395
00396 LOG(VB_FILE, LOG_INFO, LOC +
00397 QString("CalcReadAheadThresh(%1 Kb)\n\t\t\t -> "
00398 "threshhold(%2 KB) min read(%3 KB) blk size(%4 KB)")
00399 .arg(estbitrate).arg(fill_threshold/1024)
00400 .arg(fill_min/1024).arg(readblocksize/1024));
00401 }
00402
00403 bool RingBuffer::IsNearEnd(double fps, uint vvf) const
00404 {
00405 rwlock.lockForRead();
00406 int sz = ReadBufAvail();
00407 uint rbs = readblocksize;
00408
00409 uint tmp = (uint) max(abs(rawbitrate * playspeed), 0.5f * rawbitrate);
00410 uint kbits_per_sec = min(rawbitrate * 3, tmp);
00411 rwlock.unlock();
00412
00413
00414
00415
00416 double bytes_per_frame = kbits_per_sec * (1000.0/8.0) / fps;
00417 double readahead_frames = sz / bytes_per_frame;
00418
00419 bool near_end = ((vvf + readahead_frames) < 20.0) || (sz < rbs*1.5);
00420
00421 LOG(VB_PLAYBACK, LOG_INFO, LOC + "IsReallyNearEnd()" +
00422 QString(" br(%1KB)").arg(kbits_per_sec/8) +
00423 QString(" sz(%1KB)").arg(sz / 1000) +
00424 QString(" vfl(%1)").arg(vvf) +
00425 QString(" frh(%1)").arg(((uint)readahead_frames)) +
00426 QString(" ne:%1").arg(near_end));
00427
00428 return near_end;
00429 }
00430
00433 int RingBuffer::ReadBufFree(void) const
00434 {
00435 rbrlock.lockForRead();
00436 rbwlock.lockForRead();
00437 int ret = ((rbwpos >= rbrpos) ? rbrpos + bufferSize : rbrpos) - rbwpos - 1;
00438 rbwlock.unlock();
00439 rbrlock.unlock();
00440 return ret;
00441 }
00442
00445 int RingBuffer::ReadBufAvail(void) const
00446 {
00447 rbrlock.lockForRead();
00448 rbwlock.lockForRead();
00449 int ret = (rbwpos >= rbrpos) ? rbwpos - rbrpos : bufferSize - rbrpos + rbwpos;
00450 rbwlock.unlock();
00451 rbrlock.unlock();
00452 return ret;
00453 }
00454
00466 void RingBuffer::ResetReadAhead(long long newinternal)
00467 {
00468 LOG(VB_FILE, LOG_INFO, LOC +
00469 QString("ResetReadAhead(internalreadpos = %1->%2)")
00470 .arg(internalreadpos).arg(newinternal));
00471
00472 rbrlock.lockForWrite();
00473 rbwlock.lockForWrite();
00474
00475 CalcReadAheadThresh();
00476 rbrpos = 0;
00477 rbwpos = 0;
00478 internalreadpos = newinternal;
00479 ateof = false;
00480 readsallowed = false;
00481 setswitchtonext = false;
00482 generalWait.wakeAll();
00483
00484 rbwlock.unlock();
00485 rbrlock.unlock();
00486 }
00487
00502 void RingBuffer::Start(void)
00503 {
00504 bool do_start = true;
00505
00506 rwlock.lockForWrite();
00507 if (!startreadahead)
00508 {
00509 do_start = false;
00510 }
00511 else if (writemode)
00512 {
00513 LOG(VB_GENERAL, LOG_WARNING, LOC + "Not starting read ahead thread, "
00514 "this is a write only RingBuffer");
00515 do_start = false;
00516 }
00517 else if (readaheadrunning)
00518 {
00519 LOG(VB_GENERAL, LOG_WARNING, LOC + "Not starting read ahead thread, "
00520 "already running");
00521 do_start = false;
00522 }
00523
00524 if (!do_start)
00525 {
00526 rwlock.unlock();
00527 return;
00528 }
00529
00530 StartReads();
00531
00532 MThread::start();
00533
00534 while (readaheadrunning && !reallyrunning)
00535 generalWait.wait(&rwlock);
00536
00537 rwlock.unlock();
00538 }
00539
00543 void RingBuffer::KillReadAheadThread(void)
00544 {
00545 while (isRunning())
00546 {
00547 rwlock.lockForWrite();
00548 readaheadrunning = false;
00549 StopReads();
00550 generalWait.wakeAll();
00551 rwlock.unlock();
00552 MThread::wait(5000);
00553 }
00554 }
00555
00560 void RingBuffer::StopReads(void)
00561 {
00562 LOG(VB_FILE, LOG_INFO, LOC + "StopReads()");
00563 stopreads = true;
00564 generalWait.wakeAll();
00565 }
00566
00571 void RingBuffer::StartReads(void)
00572 {
00573 LOG(VB_FILE, LOG_INFO, LOC + "StartReads()");
00574 stopreads = false;
00575 generalWait.wakeAll();
00576 }
00577
00582 void RingBuffer::Pause(void)
00583 {
00584 LOG(VB_FILE, LOG_INFO, LOC + "Pause()");
00585 StopReads();
00586
00587 rwlock.lockForWrite();
00588 request_pause = true;
00589 rwlock.unlock();
00590 }
00591
00596 void RingBuffer::Unpause(void)
00597 {
00598 LOG(VB_FILE, LOG_INFO, LOC + "Unpause()");
00599 StartReads();
00600
00601 rwlock.lockForWrite();
00602 request_pause = false;
00603 generalWait.wakeAll();
00604 rwlock.unlock();
00605 }
00606
00608 bool RingBuffer::isPaused(void) const
00609 {
00610 rwlock.lockForRead();
00611 bool ret = !readaheadrunning || paused;
00612 rwlock.unlock();
00613 return ret;
00614 }
00615
00619 void RingBuffer::WaitForPause(void)
00620 {
00621 MythTimer t;
00622 t.start();
00623
00624 rwlock.lockForRead();
00625 while (readaheadrunning && !paused && request_pause)
00626 {
00627 generalWait.wait(&rwlock, 1000);
00628 if (readaheadrunning && !paused && request_pause && t.elapsed() > 1000)
00629 {
00630 LOG(VB_GENERAL, LOG_WARNING, LOC +
00631 QString("Waited %1 ms for ringbuffer pause..")
00632 .arg(t.elapsed()));
00633 }
00634 }
00635 rwlock.unlock();
00636 }
00637
00638 bool RingBuffer::PauseAndWait(void)
00639 {
00640 const uint timeout = 500;
00641
00642 if (request_pause)
00643 {
00644 if (!paused)
00645 {
00646 rwlock.unlock();
00647 rwlock.lockForWrite();
00648
00649 if (request_pause)
00650 {
00651 paused = true;
00652 generalWait.wakeAll();
00653 }
00654
00655 rwlock.unlock();
00656 rwlock.lockForRead();
00657 }
00658
00659 if (request_pause && paused && readaheadrunning)
00660 generalWait.wait(&rwlock, timeout);
00661 }
00662
00663 if (!request_pause && paused)
00664 {
00665 rwlock.unlock();
00666 rwlock.lockForWrite();
00667
00668 if (!request_pause)
00669 {
00670 paused = false;
00671 generalWait.wakeAll();
00672 }
00673
00674 rwlock.unlock();
00675 rwlock.lockForRead();
00676 }
00677
00678 return request_pause || paused;
00679 }
00680
00681 void RingBuffer::CreateReadAheadBuffer(void)
00682 {
00683 rwlock.lockForWrite();
00684 poslock.lockForWrite();
00685
00686 uint oldsize = bufferSize;
00687 uint newsize = BUFFER_SIZE_MINIMUM;
00688 if (remotefile)
00689 {
00690 newsize *= BUFFER_FACTOR_NETWORK;
00691 if (fileismatroska)
00692 newsize *= BUFFER_FACTOR_MATROSKA;
00693 if (unknownbitrate)
00694 newsize *= BUFFER_FACTOR_BITRATE;
00695 }
00696
00697
00698 if (readAheadBuffer && oldsize >= newsize)
00699 {
00700 poslock.unlock();
00701 rwlock.unlock();
00702 return;
00703 }
00704
00705 bufferSize = newsize;
00706 if (readAheadBuffer)
00707 {
00708 char* newbuffer = new char[bufferSize + 1024];
00709 memcpy(newbuffer, readAheadBuffer + rbwpos, oldsize - rbwpos);
00710 memcpy(newbuffer + (oldsize - rbwpos), readAheadBuffer, rbwpos);
00711 delete [] readAheadBuffer;
00712 readAheadBuffer = newbuffer;
00713 rbrpos = (rbrpos > rbwpos) ? (rbrpos - rbwpos) :
00714 (rbrpos + oldsize - rbwpos);
00715 rbwpos = oldsize;
00716 }
00717 else
00718 {
00719 readAheadBuffer = new char[bufferSize + 1024];
00720 }
00721 CalcReadAheadThresh();
00722 poslock.unlock();
00723 rwlock.unlock();
00724
00725 LOG(VB_FILE, LOG_INFO, LOC + QString("Created readAheadBuffer: %1Mb")
00726 .arg(newsize >> 20));
00727 }
00728
00729 void RingBuffer::run(void)
00730 {
00731 RunProlog();
00732
00733
00734 struct timeval lastread, now;
00735 int readtimeavg = 300;
00736 bool ignore_for_read_timing = true;
00737
00738 gettimeofday(&lastread, NULL);
00739
00740 CreateReadAheadBuffer();
00741 rwlock.lockForWrite();
00742 poslock.lockForWrite();
00743 request_pause = false;
00744 ResetReadAhead(0);
00745 readaheadrunning = true;
00746 reallyrunning = true;
00747 generalWait.wakeAll();
00748 poslock.unlock();
00749 rwlock.unlock();
00750
00751
00752
00753
00754
00755 rwlock.lockForRead();
00756
00757 LOG(VB_FILE, LOG_INFO, LOC +
00758 QString("Initial readblocksize %1K & fill_min %2K")
00759 .arg(readblocksize/1024).arg(fill_min/1024));
00760
00761 while (readaheadrunning)
00762 {
00763 if (PauseAndWait())
00764 {
00765 ignore_for_read_timing = true;
00766 continue;
00767 }
00768
00769 long long totfree = ReadBufFree();
00770
00771 const uint KB32 = 32*1024;
00772
00773
00774 if (((totfree < KB32) && readsallowed) ||
00775 (ignorereadpos >= 0) || commserror || stopreads)
00776 {
00777 ignore_for_read_timing |=
00778 (ignorereadpos >= 0) || commserror || stopreads;
00779 generalWait.wait(&rwlock, (stopreads) ? 50 : 1000);
00780 continue;
00781 }
00782
00783
00784
00785 if (setswitchtonext || (ateof && readsallowed))
00786 {
00787 ignore_for_read_timing = true;
00788 generalWait.wait(&rwlock, 1000);
00789 totfree = ReadBufFree();
00790 }
00791
00792 int read_return = -1;
00793 if (totfree >= KB32 && !commserror &&
00794 !ateof && !setswitchtonext)
00795 {
00796
00797 if (readblocksize > totfree)
00798 totfree = (int)(totfree / KB32) * KB32;
00799 else
00800 totfree = readblocksize;
00801
00802
00803 gettimeofday(&now, NULL);
00804 if (!ignore_for_read_timing)
00805 {
00806 int readinterval = (now.tv_sec - lastread.tv_sec ) * 1000 +
00807 (now.tv_usec - lastread.tv_usec) / 1000;
00808 readtimeavg = (readtimeavg * 9 + readinterval) / 10;
00809
00810 if (readtimeavg < 150 &&
00811 (uint)readblocksize < (BUFFER_SIZE_MINIMUM >>2) &&
00812 readblocksize >= CHUNK )
00813 {
00814 int old_block_size = readblocksize;
00815 readblocksize = 3 * readblocksize / 2;
00816 readblocksize = ((readblocksize+CHUNK-1) / CHUNK) * CHUNK;
00817 LOG(VB_FILE, LOG_INFO, LOC +
00818 QString("Avg read interval was %1 msec. "
00819 "%2K -> %3K block size")
00820 .arg(readtimeavg)
00821 .arg(old_block_size/1024)
00822 .arg(readblocksize/1024));
00823 readtimeavg = 225;
00824 }
00825 else if (readtimeavg > 300 && readblocksize > CHUNK)
00826 {
00827 readblocksize -= CHUNK;
00828 LOG(VB_FILE, LOG_INFO, LOC +
00829 QString("Avg read interval was %1 msec. "
00830 "%2K -> %3K block size")
00831 .arg(readtimeavg)
00832 .arg((readblocksize+CHUNK)/1024)
00833 .arg(readblocksize/1024));
00834 readtimeavg = 225;
00835 }
00836 }
00837 ignore_for_read_timing = (totfree < readblocksize) ? true : false;
00838 lastread = now;
00839
00840 rbwlock.lockForRead();
00841 if (rbwpos + totfree > bufferSize)
00842 {
00843 totfree = bufferSize - rbwpos;
00844 LOG(VB_FILE, LOG_DEBUG, LOC +
00845 "Shrinking read, near end of buffer");
00846 }
00847
00848 if (internalreadpos == 0)
00849 {
00850 totfree = max(fill_min, readblocksize);
00851 LOG(VB_FILE, LOG_DEBUG, LOC +
00852 "Reading enough data to start playback");
00853 }
00854
00855 LOG(VB_FILE, LOG_DEBUG, LOC +
00856 QString("safe_read(...@%1, %2) -- begin")
00857 .arg(rbwpos).arg(totfree));
00858
00859 MythTimer sr_timer;
00860 sr_timer.start();
00861
00862 int rbwposcopy = rbwpos;
00863
00864
00865
00866 rbwlock.unlock();
00867
00868 read_return = safe_read(readAheadBuffer + rbwposcopy, totfree);
00869
00870 int sr_elapsed = sr_timer.elapsed();
00871 uint64_t bps = !sr_elapsed ? 1000000001 :
00872 (uint64_t)(((double)read_return * 8000.0) /
00873 (double)sr_elapsed);
00874 LOG(VB_FILE, LOG_INFO, LOC +
00875 QString("safe_read(...@%1, %2) -> %3, took %4 ms %5")
00876 .arg(rbwposcopy).arg(totfree).arg(read_return)
00877 .arg(sr_elapsed)
00878 .arg(QString("(%1Mbps)").arg((double)bps / 1000000.0)));
00879 UpdateStorageRate(bps);
00880
00881 if (read_return >= 0)
00882 {
00883 poslock.lockForWrite();
00884 rbwlock.lockForWrite();
00885 if (rbwposcopy == rbwpos)
00886 {
00887 internalreadpos += read_return;
00888 rbwpos = (rbwpos + read_return) % bufferSize;
00889 LOG(VB_FILE, LOG_DEBUG,
00890 LOC + QString("rbwpos += %1K requested %2K in read")
00891 .arg(read_return/1024,3).arg(totfree/1024,3));
00892 }
00893 rbwlock.unlock();
00894 poslock.unlock();
00895 }
00896 }
00897
00898 int used = bufferSize - ReadBufFree();
00899
00900 bool reads_were_allowed = readsallowed;
00901
00902 if ((0 == read_return) || (numfailures > 5) ||
00903 (readsallowed != (used >= fill_min || ateof ||
00904 setswitchtonext || commserror)))
00905 {
00906
00907
00908 long long old_readpos = readpos;
00909
00910 rwlock.unlock();
00911 rwlock.lockForWrite();
00912
00913 commserror |= (numfailures > 5);
00914
00915 readsallowed = used >= fill_min || ateof ||
00916 setswitchtonext || commserror;
00917
00918 if (0 == read_return && old_readpos == readpos)
00919 {
00920 if (livetvchain)
00921 {
00922 if (!setswitchtonext && !ignoreliveeof &&
00923 livetvchain->HasNext())
00924 {
00925 livetvchain->SwitchToNext(true);
00926 setswitchtonext = true;
00927 }
00928 }
00929 else
00930 {
00931 LOG(VB_FILE, LOG_DEBUG,
00932 LOC + "setting ateof (read_return == 0)");
00933 ateof = true;
00934 }
00935 }
00936
00937 rwlock.unlock();
00938 rwlock.lockForRead();
00939 used = bufferSize - ReadBufFree();
00940 }
00941
00942 LOG(VB_FILE, LOG_DEBUG, LOC + "@ end of read ahead loop");
00943
00944 if (!readsallowed || commserror || ateof || setswitchtonext ||
00945 (wanttoread <= used && wanttoread > 0))
00946 {
00947
00948
00949
00950 generalWait.wakeAll();
00951 rwlock.unlock();
00952 usleep(5 * 1000);
00953 rwlock.lockForRead();
00954 }
00955 else
00956 {
00957
00958 if (!request_pause && reads_were_allowed &&
00959 (used >= fill_threshold || ateof || setswitchtonext))
00960 {
00961 generalWait.wait(&rwlock, 50);
00962 }
00963 else if (readsallowed)
00964 {
00965
00966 generalWait.wakeAll();
00967 rwlock.unlock();
00968 usleep(5 * 1000);
00969 rwlock.lockForRead();
00970 }
00971 }
00972 }
00973
00974 rwlock.unlock();
00975
00976 rwlock.lockForWrite();
00977 rbrlock.lockForWrite();
00978 rbwlock.lockForWrite();
00979
00980 rbrpos = 0;
00981 rbwpos = 0;
00982 reallyrunning = false;
00983 readsallowed = false;
00984 delete [] readAheadBuffer;
00985
00986 readAheadBuffer = NULL;
00987 rbwlock.unlock();
00988 rbrlock.unlock();
00989 rwlock.unlock();
00990
00991 RunEpilog();
00992 }
00993
00994 long long RingBuffer::SetAdjustFilesize(void)
00995 {
00996 rwlock.lockForWrite();
00997 poslock.lockForRead();
00998 readAdjust += internalreadpos;
00999 long long ra = readAdjust;
01000 poslock.unlock();
01001 rwlock.unlock();
01002 return ra;
01003 }
01004
01005 int RingBuffer::Peek(void *buf, int count)
01006 {
01007 int ret = ReadPriv(buf, count, true);
01008 if (ret != count)
01009 {
01010 LOG(VB_GENERAL, LOG_WARNING, LOC +
01011 QString("Peek() requested %1 bytes, but only returning %2")
01012 .arg(count).arg(ret));
01013 }
01014 return ret;
01015 }
01016
01017 bool RingBuffer::WaitForReadsAllowed(void)
01018 {
01019 MythTimer t;
01020 t.start();
01021
01022 while (!readsallowed && !stopreads &&
01023 !request_pause && !commserror && readaheadrunning)
01024 {
01025 generalWait.wait(&rwlock, 1000);
01026 if (!readsallowed && t.elapsed() > 1000)
01027 {
01028 LOG(VB_GENERAL, LOG_WARNING, LOC +
01029 "Taking too long to be allowed to read..");
01030
01031 if (t.elapsed() > 10000)
01032 {
01033 LOG(VB_GENERAL, LOG_ERR, LOC + "Took more than 10 seconds to "
01034 "be allowed to read, aborting.");
01035 return false;
01036 }
01037 }
01038 }
01039
01040 return readsallowed;
01041 }
01042
01043 bool RingBuffer::WaitForAvail(int count)
01044 {
01045 int avail = ReadBufAvail();
01046 count = (ateof && avail < count) ? avail : count;
01047
01048 if (livetvchain && setswitchtonext && avail < count)
01049 {
01050 LOG(VB_GENERAL, LOG_INFO, LOC +
01051 "Checking to see if there's a new livetv program to switch to..");
01052 livetvchain->ReloadAll();
01053 return false;
01054 }
01055
01056
01057
01058 if ((avail < count) && !stopreads &&
01059 !request_pause && !commserror && readaheadrunning)
01060 {
01061 generalWait.wakeAll();
01062 }
01063
01064 MythTimer t;
01065 t.start();
01066 while ((avail < count) && !stopreads &&
01067 !request_pause && !commserror && readaheadrunning)
01068 {
01069 wanttoread = count;
01070 generalWait.wait(&rwlock, 250);
01071 avail = ReadBufAvail();
01072
01073 if (ateof && avail < count)
01074 count = avail;
01075
01076 if (avail < count)
01077 {
01078 int elapsed = t.elapsed();
01079 if (elapsed > 500 && low_buffers && avail >= fill_min)
01080 count = avail;
01081 else if (((elapsed > 250) && (elapsed < 500)) ||
01082 ((elapsed > 500) && (elapsed < 750)) ||
01083 ((elapsed > 1000) && (elapsed < 1250)) ||
01084 ((elapsed > 2000) && (elapsed < 2250)) ||
01085 ((elapsed > 4000) && (elapsed < 4250)) ||
01086 ((elapsed > 8000) && (elapsed < 8250)) ||
01087 ((elapsed > 9000)))
01088 {
01089 LOG(VB_GENERAL, LOG_INFO, LOC + "Waited " +
01090 QString("%1").arg((elapsed / 250) * 0.25f, 3, 'f', 1) +
01091 " seconds for data \n\t\t\tto become available..." +
01092 QString(" %2 < %3") .arg(avail).arg(count));
01093 }
01094
01095 if (elapsed > 16000)
01096 {
01097 LOG(VB_GENERAL, LOG_ERR, LOC + "Waited " +
01098 QString("%1").arg(elapsed/1000) +
01099 " seconds for data, aborting.");
01100 return false;
01101 }
01102 }
01103 }
01104
01105 wanttoread = 0;
01106
01107 return avail >= count;
01108 }
01109
01110 int RingBuffer::ReadDirect(void *buf, int count, bool peek)
01111 {
01112 long long old_pos = 0;
01113 if (peek)
01114 {
01115 poslock.lockForRead();
01116 old_pos = (ignorereadpos >= 0) ? ignorereadpos : readpos;
01117 poslock.unlock();
01118 }
01119
01120 MythTimer timer;
01121 timer.start();
01122 int ret = safe_read(buf, count);
01123 int elapsed = timer.elapsed();
01124 uint64_t bps = !elapsed ? 1000000001 :
01125 (uint64_t)(((float)ret * 8000.0) / (float)elapsed);
01126 UpdateStorageRate(bps);
01127
01128 poslock.lockForWrite();
01129 if (ignorereadpos >= 0 && ret > 0)
01130 {
01131 if (peek)
01132 {
01133
01134 if (remotefile)
01135 remotefile->Seek(old_pos, SEEK_SET);
01136 else if (fd2 >= 0)
01137 lseek64(fd2, old_pos, SEEK_SET);
01138 }
01139 else
01140 {
01141 ignorereadpos += ret;
01142 }
01143 poslock.unlock();
01144 return ret;
01145 }
01146 poslock.unlock();
01147
01148 if (peek && ret > 0)
01149 {
01150 if ((IsDVD() || IsBD()) && old_pos != 0)
01151 {
01152 LOG(VB_GENERAL, LOG_ERR, LOC +
01153 "DVD and Blu-Ray do not support arbitrary "
01154 "peeks except when read-ahead is enabled."
01155 "\n\t\t\tWill seek to beginning of video.");
01156 old_pos = 0;
01157 }
01158
01159 long long new_pos = Seek(old_pos, SEEK_SET, true);
01160
01161 if (new_pos != old_pos)
01162 {
01163 LOG(VB_GENERAL, LOG_ERR, LOC +
01164 QString("Peek() Failed to return from new "
01165 "position %1 to old position %2, now "
01166 "at position %3")
01167 .arg(old_pos - ret).arg(old_pos).arg(new_pos));
01168 }
01169 }
01170
01171 return ret;
01172 }
01173
01182 int RingBuffer::ReadPriv(void *buf, int count, bool peek)
01183 {
01184 QString loc_desc = QString("ReadPriv(..%1, %2)")
01185 .arg(count).arg(peek?"peek":"normal");
01186 LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
01187 QString(" @%1 -- begin").arg(rbrpos));
01188
01189 rwlock.lockForRead();
01190 if (writemode)
01191 {
01192 LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
01193 ": Attempt to read from a write only file");
01194 errno = EBADF;
01195 rwlock.unlock();
01196 return -1;
01197 }
01198
01199 if (commserror)
01200 {
01201 LOG(VB_GENERAL, LOG_ERR, LOC + loc_desc +
01202 ": Attempt to read after commserror set");
01203 errno = EIO;
01204 rwlock.unlock();
01205 return -1;
01206 }
01207
01208 if (request_pause || stopreads || !readaheadrunning || (ignorereadpos>=0))
01209 {
01210 rwlock.unlock();
01211 rwlock.lockForWrite();
01212
01213
01214
01215
01216
01217 if (request_pause || stopreads ||
01218 !readaheadrunning || (ignorereadpos >= 0))
01219 {
01220 int ret = ReadDirect(buf, count, peek);
01221 LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc +
01222 QString(": ReadDirect checksum %1")
01223 .arg(qChecksum((char*)buf,count)));
01224 rwlock.unlock();
01225 return ret;
01226 }
01227 rwlock.unlock();
01228 rwlock.lockForRead();
01229 }
01230
01231 if (!WaitForReadsAllowed())
01232 {
01233 LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForReadsAllowed()");
01234 rwlock.unlock();
01235 stopreads = true;
01236 rwlock.lockForWrite();
01237 wanttoread = 0;
01238 rwlock.unlock();
01239 return 0;
01240 }
01241
01242 if (!WaitForAvail(count))
01243 {
01244 LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": !WaitForAvail()");
01245 rwlock.unlock();
01246 stopreads = true;
01247 rwlock.lockForWrite();
01248 ateof = true;
01249 wanttoread = 0;
01250 rwlock.unlock();
01251 return 0;
01252 }
01253
01254 count = min(ReadBufAvail(), count);
01255
01256 if (count <= 0)
01257 {
01258
01259
01260
01261 LOG(VB_FILE, LOG_NOTICE, LOC + loc_desc + ": ReadBufAvail() == 0");
01262 rwlock.unlock();
01263 return count;
01264 }
01265
01266 if (peek)
01267 rbrlock.lockForRead();
01268 else
01269 rbrlock.lockForWrite();
01270
01271 LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + " -- copying data");
01272
01273 if (rbrpos + count > (int) bufferSize)
01274 {
01275 int firstsize = bufferSize - rbrpos;
01276 int secondsize = count - firstsize;
01277
01278 memcpy(buf, readAheadBuffer + rbrpos, firstsize);
01279 memcpy((char *)buf + firstsize, readAheadBuffer, secondsize);
01280 }
01281 else
01282 {
01283 memcpy(buf, readAheadBuffer + rbrpos, count);
01284 }
01285 LOG(VB_FILE, LOG_DEBUG, LOC + loc_desc + QString(" -- checksum %1")
01286 .arg(qChecksum((char*)buf,count)));
01287
01288 if (!peek)
01289 {
01290 rbrpos = (rbrpos + count) % bufferSize;
01291 generalWait.wakeAll();
01292 }
01293 rbrlock.unlock();
01294 rwlock.unlock();
01295
01296 return count;
01297 }
01298
01307 int RingBuffer::Read(void *buf, int count)
01308 {
01309 int ret = ReadPriv(buf, count, false);
01310 if (ret > 0)
01311 {
01312 poslock.lockForWrite();
01313 readpos += ret;
01314 poslock.unlock();
01315 }
01316
01317 UpdateDecoderRate(ret);
01318 return ret;
01319 }
01320
01321 QString RingBuffer::BitrateToString(uint64_t rate, bool hz)
01322 {
01323 QString msg;
01324 float bitrate;
01325 int range = 0;
01326 if (rate < 1)
01327 {
01328 return "-";
01329 }
01330 else if (rate > 1000000000)
01331 {
01332 return QObject::tr(">1Gbps");
01333 }
01334 else if (rate >= 1000000)
01335 {
01336 msg = hz ? QObject::tr("%1MHz") : QObject::tr("%1Mbps");
01337 bitrate = (float)rate / (1000000.0);
01338 range = hz ? 3 : 1;
01339 }
01340 else if (rate >= 1000)
01341 {
01342 msg = hz ? QObject::tr("%1kHz") : QObject::tr("%1kbps");
01343 bitrate = (float)rate / 1000.0;
01344 range = hz ? 1 : 0;
01345 }
01346 else
01347 {
01348 msg = hz ? QObject::tr("%1Hz") : QObject::tr("%1bps");
01349 bitrate = (float)rate;
01350 }
01351 return msg.arg(bitrate, 0, 'f', range);
01352 }
01353
01354 QString RingBuffer::GetDecoderRate(void)
01355 {
01356 return BitrateToString(UpdateDecoderRate());
01357 }
01358
01359 QString RingBuffer::GetStorageRate(void)
01360 {
01361 return BitrateToString(UpdateStorageRate());
01362 }
01363
01364 QString RingBuffer::GetAvailableBuffer(void)
01365 {
01366 if (type == kRingBuffer_DVD || type == kRingBuffer_BD)
01367 return "N/A";
01368
01369 int avail = (rbwpos >= rbrpos) ? rbwpos - rbrpos : bufferSize - rbrpos + rbwpos;
01370 return QString("%1%").arg((int)(((float)avail / (float)bufferSize) * 100.0));
01371 }
01372
01373 uint64_t RingBuffer::UpdateDecoderRate(uint64_t latest)
01374 {
01375 if (!bitrateMonitorEnabled)
01376 return 0;
01377
01378
01379 static QTime midnight = QTime(0, 0, 0);
01380 QTime now = QTime::currentTime();
01381 qint64 age = midnight.msecsTo(now);
01382 qint64 oldest = age - 1000;
01383
01384 decoderReadLock.lock();
01385 if (latest)
01386 decoderReads.insert(age, latest);
01387
01388 uint64_t total = 0;
01389 QMutableMapIterator<qint64,uint64_t> it(decoderReads);
01390 while (it.hasNext())
01391 {
01392 it.next();
01393 if (it.key() < oldest || it.key() > age)
01394 it.remove();
01395 else
01396 total += it.value();
01397 }
01398
01399 uint64_t average = (uint64_t)((double)total * 8.0);
01400 decoderReadLock.unlock();
01401
01402 LOG(VB_FILE, LOG_INFO, LOC + QString("Decoder read speed: %1 %2")
01403 .arg(average).arg(decoderReads.size()));
01404 return average;
01405 }
01406
01407 uint64_t RingBuffer::UpdateStorageRate(uint64_t latest)
01408 {
01409 if (!bitrateMonitorEnabled)
01410 return 0;
01411
01412
01413 static QTime midnight = QTime(0, 0, 0);
01414 QTime now = QTime::currentTime();
01415 qint64 age = midnight.msecsTo(now);
01416 qint64 oldest = age - 1000;
01417
01418 storageReadLock.lock();
01419 if (latest)
01420 storageReads.insert(age, latest);
01421
01422 uint64_t total = 0;
01423 QMutableMapIterator<qint64,uint64_t> it(storageReads);
01424 while (it.hasNext())
01425 {
01426 it.next();
01427 if (it.key() < oldest || it.key() > age)
01428 it.remove();
01429 else
01430 total += it.value();
01431 }
01432
01433 int size = storageReads.size();
01434 storageReadLock.unlock();
01435
01436 uint64_t average = size ? (uint64_t)(((double)total) / (double)size) : 0;
01437
01438 LOG(VB_FILE, LOG_INFO, LOC + QString("Average storage read speed: %1 %2")
01439 .arg(average).arg(storageReads.size()));
01440 return average;
01441 }
01442
01447 int RingBuffer::Write(const void *buf, uint count)
01448 {
01449 rwlock.lockForRead();
01450
01451 if (!writemode)
01452 {
01453 LOG(VB_GENERAL, LOG_ERR, LOC + "Tried to write to a read only file.");
01454 rwlock.unlock();
01455 return -1;
01456 }
01457
01458 if (!tfw && !remotefile)
01459 {
01460 rwlock.unlock();
01461 return -1;
01462 }
01463
01464 int ret = -1;
01465 if (tfw)
01466 ret = tfw->Write(buf, count);
01467 else
01468 ret = remotefile->Write(buf, count);
01469
01470 if (ret > 0)
01471 {
01472 poslock.lockForWrite();
01473 writepos += ret;
01474 poslock.unlock();
01475 }
01476
01477 rwlock.unlock();
01478
01479 return ret;
01480 }
01481
01485 void RingBuffer::Sync(void)
01486 {
01487 rwlock.lockForRead();
01488 if (tfw)
01489 tfw->Sync();
01490 rwlock.unlock();
01491 }
01492
01495 long long RingBuffer::WriterSeek(long long pos, int whence, bool has_lock)
01496 {
01497 long long ret = -1;
01498
01499 if (!has_lock)
01500 rwlock.lockForRead();
01501
01502 poslock.lockForWrite();
01503
01504 if (tfw)
01505 {
01506 ret = tfw->Seek(pos, whence);
01507 writepos = ret;
01508 }
01509
01510 poslock.unlock();
01511
01512 if (!has_lock)
01513 rwlock.unlock();
01514
01515 return ret;
01516 }
01517
01522 void RingBuffer::WriterFlush(void)
01523 {
01524 rwlock.lockForRead();
01525 if (tfw)
01526 {
01527 tfw->Flush();
01528 tfw->Sync();
01529 }
01530 rwlock.unlock();
01531 }
01532
01536 void RingBuffer::SetWriteBufferMinWriteSize(int newMinSize)
01537 {
01538 rwlock.lockForRead();
01539 if (tfw)
01540 tfw->SetWriteBufferMinWriteSize(newMinSize);
01541 rwlock.unlock();
01542 }
01543
01559 void RingBuffer::SetOldFile(bool is_old)
01560 {
01561 LOG(VB_FILE, LOG_INFO, LOC + QString("SetOldFile(%1)").arg(is_old));
01562 rwlock.lockForWrite();
01563 oldfile = is_old;
01564 rwlock.unlock();
01565 }
01566
01568 QString RingBuffer::GetFilename(void) const
01569 {
01570 rwlock.lockForRead();
01571 QString tmp = filename;
01572 rwlock.unlock();
01573 return tmp;
01574 }
01575
01576 QString RingBuffer::GetSubtitleFilename(void) const
01577 {
01578 rwlock.lockForRead();
01579 QString tmp = subtitlefilename;
01580 rwlock.unlock();
01581 return tmp;
01582 }
01583
01587 long long RingBuffer::GetWritePosition(void) const
01588 {
01589 poslock.lockForRead();
01590 long long ret = writepos;
01591 poslock.unlock();
01592 return ret;
01593 }
01594
01599 bool RingBuffer::LiveMode(void) const
01600 {
01601 rwlock.lockForRead();
01602 bool ret = (livetvchain);
01603 rwlock.unlock();
01604 return ret;
01605 }
01606
01611 void RingBuffer::SetLiveMode(LiveTVChain *chain)
01612 {
01613 rwlock.lockForWrite();
01614 livetvchain = chain;
01615 rwlock.unlock();
01616 }
01617
01619 void RingBuffer::IgnoreLiveEOF(bool ignore)
01620 {
01621 rwlock.lockForWrite();
01622 ignoreliveeof = ignore;
01623 rwlock.unlock();
01624 }
01625
01626 const DVDRingBuffer *RingBuffer::DVD(void) const
01627 {
01628 return dynamic_cast<const DVDRingBuffer*>(this);
01629 }
01630
01631 const BDRingBuffer *RingBuffer::BD(void) const
01632 {
01633 return dynamic_cast<const BDRingBuffer*>(this);
01634 }
01635
01636 DVDRingBuffer *RingBuffer::DVD(void)
01637 {
01638 return dynamic_cast<DVDRingBuffer*>(this);
01639 }
01640
01641 BDRingBuffer *RingBuffer::BD(void)
01642 {
01643 return dynamic_cast<BDRingBuffer*>(this);
01644 }
01645
01646