2021-04-10 23:39:58 +09:00
import re
import os
import time
RE_SEGMENT = re . compile ( r ' stream(?P<number> \ d+).m4s ' )
SEGMENT_INIT = ' init.mp4 '
2021-04-11 14:49:42 +09:00
STREAM_TIMEOUT = 24 # consider the stream offline after this many seconds without a new segment
SEGMENT_OFFSET = 4 # start this many segments back from now (1 is most recent segment)
2021-04-10 23:39:58 +09:00
2021-04-11 15:06:22 +09:00
# TODO: sometimes the stream will restart so StreamOffline will be raised, but you could just start appending the segments from the new stream instead of closing the connection
2021-04-10 23:39:58 +09:00
def _segment_number ( fn ) :
if fn == SEGMENT_INIT : return None
return int ( RE_SEGMENT . fullmatch ( fn ) . group ( ' number ' ) )
def _is_segment ( fn ) :
return bool ( RE_SEGMENT . fullmatch ( fn ) )
def get_next_segment ( after , segments_dir ) :
start = time . time ( )
while True :
time . sleep ( 1 )
segments = get_segments ( segments_dir )
if after == None :
return SEGMENT_INIT
elif after == SEGMENT_INIT :
2021-04-11 14:49:42 +09:00
try :
return segments [ - min ( SEGMENT_OFFSET , len ( segments ) ) ]
except IndexError :
pass
2021-04-10 23:39:58 +09:00
else :
segments = filter ( lambda segment : _segment_number ( segment ) > _segment_number ( after ) , segments )
try :
return min ( segments , key = _segment_number )
except ValueError :
if time . time ( ) - start > = STREAM_TIMEOUT :
2021-04-11 14:49:42 +09:00
print ( f ' SegmentUnavailable in get_next_segment; { after =} ' )
raise SegmentUnavailable
2021-04-10 23:39:58 +09:00
def get_segments ( segments_dir ) :
segments = os . listdir ( segments_dir )
segments = filter ( _is_segment , segments )
segments = sorted ( segments , key = _segment_number )
return segments
2021-04-11 14:49:42 +09:00
class SegmentUnavailable ( Exception ) :
2021-04-10 23:39:58 +09:00
pass
class SegmentsIterator :
2021-04-11 14:49:42 +09:00
def __init__ ( self , segments_dir , skip_init_segment = False ) :
2021-04-10 23:39:58 +09:00
self . segments_dir = segments_dir
2021-04-11 14:49:42 +09:00
self . segment = SEGMENT_INIT if skip_init_segment else None
2021-04-10 23:39:58 +09:00
def __iter__ ( self ) :
return self
def __next__ ( self ) :
self . segment = get_next_segment ( self . segment , self . segments_dir )
return self . segment
class ConcatenatedSegments :
def __init__ ( self , segments_dir , segment_hook = None ) :
2021-04-11 14:49:42 +09:00
self . segment_hook = segment_hook or ( lambda n : None )
self . segments_dir = segments_dir
self . _reset ( )
def _reset ( self , skip_init_segment = False ) :
print ( ' ConcatenatedSegments._reset ' )
self . segments = SegmentsIterator ( self . segments_dir , skip_init_segment = skip_init_segment )
2021-04-10 23:39:58 +09:00
self . segment = next ( self . segments )
self . segment_read_offset = 0
2021-04-11 02:50:50 +09:00
self . _closed = False
2021-04-11 14:49:42 +09:00
2021-04-10 23:39:58 +09:00
def _read ( self , n ) :
chunk = b ' '
2021-04-11 14:49:42 +09:00
while True :
2021-04-10 23:39:58 +09:00
#print(f' {len(chunk)=}, {n=}, {self.segment=}, {self.segment_read_offset=}')
2021-04-11 14:49:42 +09:00
#print(f' segment {self.segment} exists:', os.path.isfile(os.path.join(self.segments.segments_dir, self.segment)))
2021-04-10 23:39:58 +09:00
with open ( os . path . join ( self . segments . segments_dir , self . segment ) , ' rb ' ) as fp :
fp . seek ( self . segment_read_offset )
chunk_chunk = fp . read ( n - len ( chunk ) )
self . segment_read_offset + = len ( chunk_chunk )
chunk + = chunk_chunk
if len ( chunk ) > = n :
break
self . segment_read_offset = 0
try :
next_segment = next ( self . segments )
2021-04-11 14:49:42 +09:00
except SegmentUnavailable :
print ( ' SegmentUnavailable in ConcatenatedSegments._read ' )
raise
2021-04-10 23:39:58 +09:00
else :
self . segment_hook ( _segment_number ( self . segment ) )
self . segment = next_segment
#print(f'_read EXIT; {len(chunk)=}, {n=}, {self.segment=}, {self.segment_read_offset=}')
return chunk
def read ( self , n ) :
2021-04-11 02:50:50 +09:00
if self . _closed :
return b ' '
2021-04-10 23:39:58 +09:00
try :
return self . _read ( n )
2021-04-11 14:49:42 +09:00
except ( FileNotFoundError , SegmentUnavailable ) :
if self . segment == SEGMENT_INIT :
self . close ( )
return b ' '
else :
2021-04-11 15:06:22 +09:00
# If fragment gets interrupted and we start appending whole new
# fragments after it, the video will get corrupted.
2021-04-11 14:56:34 +09:00
# It appears this is very likely to happen if you become
# extremely delayed. At least it's clear that you need to
# refresh the page.
2021-04-11 15:06:22 +09:00
# It's also likely to happen if the reason for the
# discontinuity is the livestream restarting.
# TODO: find out how to repair a stream of fragmented mp4s
#print('DISCONTINUITY in ConcatenatedSegments.read')
#self._repair() # <-- create this function
#self._reset(skip_init_segment=True)
#return self._read(n)
# this is here so the video gets corrupted
2021-04-11 14:49:42 +09:00
self . _reset ( skip_init_segment = True )
return self . _read ( n )
2021-04-10 23:39:58 +09:00
def close ( self ) :
2021-04-11 02:50:50 +09:00
self . _closed = True