Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry text that failed to send to elevenlabs + fix skipped task_done() call #152

Merged
merged 1 commit into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ async def _run(self, max_retry: int) -> None:
retry_count = 0
listen_task: Optional[asyncio.Task] = None
ws: Optional[aiohttp.ClientWebSocketResponse] = None
retry_text_queue: asyncio.Queue[str] = asyncio.Queue()
while True:
try:
ws = await self._try_connect()
Expand All @@ -194,7 +195,13 @@ async def _run(self, max_retry: int) -> None:
# forward queued text to 11labs
started = False
while not ws.closed:
text = await self._queue.get()
text = None
if retry_text_queue.empty():
text = await retry_text_queue.get()
retry_text_queue.task_done()
else:
text = await self._queue.get()

if not started:
self._event_queue.put_nowait(
tts.SynthesisEvent(type=tts.SynthesisEventType.STARTED)
Expand All @@ -204,7 +211,19 @@ async def _run(self, max_retry: int) -> None:
text=text,
try_trigger_generation=True,
)
await ws.send_str(json.dumps(text_packet))

# This case can happen in normal operation because 11labs will not
# keep connections open indefinitely if we are not sending data.
try:
await ws.send_str(json.dumps(text_packet))
except Exception:
await retry_text_queue.put(text)
break

# We call self._queue.task_done() even if we are retrying the text because
# all text has gone through self._queue. An exception may have short-circuited
# out of the loop so task_done() will not have already been called on text that
# is being retried.
self._queue.task_done()
if text == STREAM_EOS:
await listen_task
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.

__version__ = "0.1.3"
__version__ = "0.1.4"
Loading