Skip to content

Commit

Permalink
Add a little bit of sync
Browse files Browse the repository at this point in the history
Sync - Part One
  • Loading branch information
ckdo committed Jul 18, 2020
1 parent 8c39c10 commit ef5fd48
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 22 deletions.
4 changes: 2 additions & 2 deletions ap2-receiver.py
Original file line number Diff line number Diff line change
Expand Up @@ -408,9 +408,9 @@ def do_SETRATEANCHORTIME(self):

plist = readPlistFromString(body)
if plist["rate"] == 1:
self.server.streams[0].audio_connection.send("play")
self.server.streams[0].audio_connection.send("play-%i" % plist["rtpTime"])
if plist["rate"] == 0:
self.server.streams[0].audio_connection.send("stop")
self.server.streams[0].audio_connection.send("pause")
self.pp.pprint(plist)
self.send_response(200)
self.send_header("Server", self.version_string())
Expand Down
116 changes: 96 additions & 20 deletions ap2/connections/audio.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import multiprocessing
import enum
import threading
import time

import av
import numpy
Expand Down Expand Up @@ -104,7 +105,7 @@ def can_read(self):
def next(self):
# print("read - %i %i" % (self.read_index, self.write_index))
if self.read_index == -1:
raise Exception("buffer: read is not possible - empty buffer")
print("buffer: read is not possible - empty buffer")
return None
else:
buffered_object = self.buffer_array[self.read_index]
Expand Down Expand Up @@ -150,10 +151,9 @@ def find_seq(self, seq):
else:
start_index = self.increment_index(start_index)

# Flush - Must be called from reader (player)
def flush_read(self):
self.read_index = -1

# initialize buffer for reading
def init(self):
self.read_index = self.write_index

# Flush - Must be called from writer (server)
def flush_write(self, index_from):
Expand Down Expand Up @@ -316,59 +316,121 @@ def __init__(self, session_key, audio_format):
super(AudioBuffered, self).__init__(session_key, audio_format)
self.socket = get_free_tcp_socket()
self.port = self.socket.getsockname()[1]
self.anchorMonotonicTime = None
self.anchorRtpTime = None

def get_time_offset(self, rtp_ts):
if not self.anchorRtpTime:
return 0
rtptime_offset = rtp_ts - self.anchorRtpTime
realtime_offset_ms = (time.monotonic_ns() - self.anchorMonotonicTime) / 10 ** 6
# TODO: replace 44100 with real framerate
time_offset_ms = (1000 * rtptime_offset / 44100) - realtime_offset_ms
return time_offset_ms

def get_min_timestamp(self):
realtime_offset_sec = (time.monotonic_ns() - self.anchorMonotonicTime) / 10 ** 9
print("player: get_min_timestamp - realtime_offset_sec={:06.4f}".format(realtime_offset_sec))
res = self.anchorRtpTime + realtime_offset_sec * 44100
print("player: get_min_timestamp return=%i" % res)

return res


def forward(self, requested_timestamp):
finished = False
while not finished:
rtp = self.rtp_buffer.next()
if rtp:
if rtp.timestamp >= requested_timestamp:
finished = True
else:
pass
#print("player: still forwarding.. ts=%i" % rtp.timestamp)
else:
print("player: !!! error while forwarding !!!")
finished = True

# player moves readindex in buffer
def play(self, rtspconn, serverconn):
playing = False
data_ready = False
data_ontime = True
i = 0
while True:
if not playing:
timeout = None
rtsp_timeout = None
else:
rtsp_timeout = 0
if not data_ontime:
server_timeout = None
else:
timeout = 0
server_timeout = 0

if self.rtp_buffer.can_read():
data_ready = True

if serverconn.poll():
if serverconn.poll(server_timeout):
message = serverconn.recv()
if message == "data_ready":
data_ready = True
elif message == "data_ontime_response":
print("player: ontime data response received")
ts = self.get_min_timestamp()
print("player: forwarding to timestamp %i" % ts)
self.forward(ts)

data_ontime = True

if rtspconn.poll(timeout):
if rtspconn.poll(rtsp_timeout):
message = rtspconn.recv()
if message == "stop":
if str.startswith(message, "play"):
self.anchorMonotonicTime = time.monotonic_ns()
self.anchorRtpTime = int(str.split(message, "-")[1])

playing = True

elif message == "pause":
playing = False
data_ready = False
if message == "play":
playing = True

if str.startswith(message, "flush_from_until_seq"):
elif str.startswith(message, "flush_from_until_seq"):
pending_flush_from_seq, pending_flush_until_seq = str.split(message, "-")[-2:]
pending_flush_from_seq = int(pending_flush_from_seq)
pending_flush_until_seq = int(pending_flush_until_seq)

print("player: request flush received from-until %i-%i" % (pending_flush_from_seq, pending_flush_until_seq))
if pending_flush_from_seq == 0:
# only until is provided -> flush all the buffer
print("player: flush all buffer")
self.rtp_buffer.flush_read()
print("player: relay message to server to flush from-until sequence %i-%i" % (pending_flush_from_seq, pending_flush_until_seq))
serverconn.send(message)

if playing and data_ready:
rtp = self.rtp_buffer.next()
if rtp:
time_offset_ms = self.get_time_offset(rtp.timestamp)
if i % 100 == 0:
print("player: offset is %i ms" % time_offset_ms)
if time_offset_ms >= 100:
print("player: offset %i ms too big - seq = %i - sleeping %s sec" % (time_offset_ms, rtp.sequence_no, "{:05.2f}".format(time_offset_ms /1000)))
time.sleep(time_offset_ms / 1000)
elif time_offset_ms < -100:
print("player: offset %i ms too low - seq = %i - sending ontime data request" % (time_offset_ms, rtp.sequence_no))
# request on_time data message
serverconn.send("on_time_data_request")
data_ontime = False

audio = self.process(rtp)
self.sink.write(audio)
i += 1

# server moves write index in buffer
# the exception to this rule is the buffer initialization (init call)
def serve(self, playerconn):
self.logger = get_logger("audio", level="DEBUG")
self.init_audio_sink()

conn, addr = self.socket.accept()
seq_to_overtake = None
pending_ontime_data_request = False
try:
while True:
while playerconn.poll():
Expand All @@ -384,6 +446,9 @@ def serve(self, playerconn):
print("server: successfully flushed - write index moved to %i" % from_index)
else:
print("server: flush did not move write index")
elif message == "on_time_data_request":
print("server: ontime data request received")
pending_ontime_data_request = True

message = conn.recv(2, socket.MSG_WAITALL)
if message:
Expand All @@ -392,16 +457,27 @@ def serve(self, playerconn):

rtp = RTP_BUFFERED(data)
self.handle(rtp)
# do not write data if it is expired
if seq_to_overtake is None or rtp.sequence_no >= seq_to_overtake:
time_offset_ms = self.get_time_offset(rtp.timestamp)
#print("server: writing seq %i timeoffset %i" % (rtp.sequence_no, time_offset_ms))
if seq_to_overtake is None:
self.rtp_buffer.add(rtp)
if seq_to_overtake is not None:
else:
print("server: searching sequence %i - current is %i" % (seq_to_overtake, rtp.sequence_no))
# do not write data if it is expired
if rtp.sequence_no >= seq_to_overtake:
if pending_flush_from_seq == 0:
print("server: buffer initialisation")
self.rtp_buffer.init()
self.rtp_buffer.add(rtp)
print("server: requested sequence to overtake %i - receiving sequence %i" % (seq_to_overtake, rtp.sequence_no))
# as soon as we overtake seq_to_overtake sequence, let's inform the player
playerconn.send("data_ready")
seq_to_overtake = None
if pending_ontime_data_request:
if time_offset_ms >= 100:
pending_ontime_data_request = False
playerconn.send("data_ontime_response")
print("server: ontime data response sent")

except KeyboardInterrupt:
pass
Expand Down

0 comments on commit ef5fd48

Please sign in to comment.