From 532456b7dbfd640a0467e17cbb23f421d75bf507 Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Tue, 17 Sep 2024 16:14:12 +0300 Subject: [PATCH 01/16] Rewrite token generation, add web-server --- potoken.py | 233 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 potoken.py diff --git a/potoken.py b/potoken.py new file mode 100644 index 0000000..2df51be --- /dev/null +++ b/potoken.py @@ -0,0 +1,233 @@ +import argparse +import asyncio +import json +import logging +import time +from socketserver import ThreadingMixIn +from tempfile import TemporaryDirectory +from typing import Callable, Optional, Tuple +from wsgiref.simple_server import WSGIServer, make_server + +import nodriver + + +class PotokenExtractor: + + def __init__(self, loop: asyncio.AbstractEventLoop, update_interfal: float = 3600 * 12) -> None: + self.update_interval: float = update_interfal + self.profile_path = TemporaryDirectory() # cleaned up on exit by nodriver + self._loop = loop + self._token_info: Optional[dict] = None + self._ongoing_update: asyncio.Lock = asyncio.Lock() + self._extraction_done: asyncio.Event = asyncio.Event() + self._update_requested: asyncio.Event = asyncio.Event() + + def get(self) -> Optional[dict]: + return self._token_info + + async def run_once(self) -> Optional[dict]: + await self._update() + return self.get() + + async def run(self): + await self._update() + while True: + try: + await asyncio.wait_for(self._update_requested.wait(), timeout=self.update_interval) + logging.debug('initiating force update') + except asyncio.TimeoutError: + logging.debug('initiating scheduled update') + await self._update() + self._update_requested.clear() + + def request_update(self) -> bool: + """Request immediate update, return False if update request is already set""" + if self._ongoing_update.locked(): + logging.debug('update process is already running') + return False + if self._update_requested.is_set(): + logging.debug('force update has already been requested') + return False + self._loop.call_soon_threadsafe(self._update_requested.set) + logging.debug('force update requested') + return True + + @staticmethod + def _extract_token(request: nodriver.cdp.network.Request) -> Optional[dict]: + post_data = request.post_data + try: + post_data_json = json.loads(post_data) + visitor_data = post_data_json['context']['client']['visitorData'] + potoken = post_data_json['serviceIntegrityDimensions']['poToken'] + except (json.JSONDecodeError, TypeError, KeyError) as e: + logging.warning(f'failed to extract token from request: {type(e)}, {e}') + return None + token_info = { + 'updated': int(time.time()), + 'potoken': potoken, + 'visitor_data': visitor_data + } + return token_info + + async def _update(self): + if self._ongoing_update.locked(): + logging.debug('update is already in progress') + return + + async with self._ongoing_update: + logging.info(f'update started') + self._extraction_done.clear() + + browser = await nodriver.start(headless=False, user_data_dir=self.profile_path.name) + tab = browser.main_tab + tab.add_handler(nodriver.cdp.network.RequestWillBeSent, self._send_handler) + await tab.get('https://www.youtube.com/embed/jNQXAC9IVRw') + player = await tab.select("#movie_player") + await player.click() + + try: + await asyncio.wait_for(self._extraction_done.wait(), timeout=30) + except asyncio.TimeoutError: + logging.warning(f'update failed') + else: + logging.info('update was succeessful') + + await tab.close() + browser.stop() + + async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent): + if not event.request.method == 'POST': + return + if not '/youtubei/v1/player' in event.request.url: + return + token_info = self._extract_token(event.request) + if token_info is None: + return + logging.info(f'new token: {token_info}') + self._token_info = token_info + self._extraction_done.set() + + +class ThreadingWSGIServer(WSGIServer, ThreadingMixIn): + """Thread per request HTTP server.""" + daemon_threads = True + + +class PotokenServer: + + def __init__(self, potoken_extractor: PotokenExtractor, port: int = 8080, bind_address: str = '0.0.0.0'): + self.port = port + self.bind_address = bind_address + self._potoken_extractor = potoken_extractor + self._httpd: Optional[ThreadingWSGIServer] = None + + def get_potoken(self) -> Tuple[str, list, str]: + token = self._potoken_extractor.get() + if token is None: + status = '502 Bad Gateway' + headers = [("Content-Type", "text/plain")] + page = 'Token has not yet been generated, try again later.' + else: + status = '200 OK' + headers = [("Content-Type", "application/json")] + page = json.dumps(token) + return status, headers, page + + def request_update(self) -> Tuple[str, list, str]: + status = '200 OK' + headers = [("Content-Type", "text/plain")] + + accepted = self._potoken_extractor.request_update() + if accepted: + page = 'Update request accepted, new token will be generated soon.' + else: + page = 'Update has already been requested, new token will be generated soon.' + + return status, headers, page + + def get_route_handler(self, route: str) -> Callable[[], Tuple[str, list, str]]: + handlers = { + # handler is a function returning a tuple of status, headers, page text + '/404': lambda: ('404 Not Found', [("Content-Type", "text/plain")], 'Not Found'), + '/': lambda: ('302 Found', [('Location', '/token')], '/token'), + '/token': self.get_potoken, + '/update': self.request_update + } + return handlers.get(route) or handlers['/404'] + + def app(self, environ, start_response): + route = environ['PATH_INFO'] + + handler = self.get_route_handler(route) + status, headers, page = handler() + + start_response(status, headers) + return [page.encode('utf8')] + + def run(self): + logging.info(f'Starting web-server at {self.bind_address}:{self.port}') + self._httpd = make_server(self.bind_address, self.port, self.app, ThreadingWSGIServer) + with self._httpd: + self._httpd.serve_forever() + + def stop(self): + if self._httpd is None: + return + self._httpd.shutdown() + + +def main(update_interval, bind_address, port) -> None: + loop = nodriver.loop() + + potoken_extractor = PotokenExtractor(loop, update_interfal=update_interval) + potoken_server = PotokenServer(potoken_extractor, port=port, bind_address=bind_address) + + extractor_task = loop.create_task(potoken_extractor.run()) + server_task = loop.create_task(asyncio.to_thread(potoken_server.run)) + + try: + main_task = asyncio.gather(extractor_task, server_task) + loop.run_until_complete(main_task) + except Exception: + # exceptions raised by the tasks are intentionally propogated + # to ensure process exit code is 1 on error + raise + except (KeyboardInterrupt, asyncio.CancelledError): + logging.info('Stopping...') + finally: + potoken_server.stop() + + +def set_logging(log_level=logging.DEBUG): + log_format = '%(asctime)s.%(msecs)03d [%(name)s] [%(levelname)s] %(message)s' + datefmt = '%Y/%m/%d %H:%M:%S' + logging.basicConfig(level=log_level, format=log_format, datefmt=datefmt) + logging.getLogger('asyncio').setLevel(logging.INFO) + logging.getLogger('nodriver').setLevel(logging.WARNING) + logging.getLogger('uc').setLevel(logging.WARNING) + logging.getLogger('websockets').setLevel(logging.WARNING) + + +def args_parse(): + description = ''' +Retrieve potoken using Chromium runned by nodriver, serve it on a json endpoint + + Token is generated on startup, and then every UPDATE_INTERVAL seconds. + With web-server running on default port, the token is available on the + http://127.0.0.1:8080/token endpoint. It is possible to request immediate + token regeneration by accessing http://127.0.0.1:8080/update + ''' + parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('--update-interval', '-u', type=int, default=3600 * 12, + help='How ofthen new token is generated, in seconds (default: %(default)s)') + parser.add_argument('--port', '-p', type=int, default=8080, + help='Port webserver is listening on (default: %(default)s)') + parser.add_argument('--bind', '-b', default='0.0.0.0', + help='Address webserver binds to (default: %(default)s)') + return parser.parse_args() + + +if __name__ == '__main__': + set_logging() + args = args_parse() + main(update_interval=args.update_interval, bind_address=args.bind, port=args.port) From f0a16740acc8cd99842bbf6e4c26104b63998e27 Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Tue, 17 Sep 2024 16:12:45 +0300 Subject: [PATCH 02/16] Replace use of TemporaryDirectory with mkdtemp Turns out TemporaryDirectory initiates cleanup on interpreter exit even when it is used without context manager, which might interfere with nodriver trying to delete the directory at the same time. --- potoken.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/potoken.py b/potoken.py index 2df51be..b833f95 100644 --- a/potoken.py +++ b/potoken.py @@ -4,7 +4,7 @@ import logging import time from socketserver import ThreadingMixIn -from tempfile import TemporaryDirectory +from tempfile import mkdtemp from typing import Callable, Optional, Tuple from wsgiref.simple_server import WSGIServer, make_server @@ -15,7 +15,7 @@ class PotokenExtractor: def __init__(self, loop: asyncio.AbstractEventLoop, update_interfal: float = 3600 * 12) -> None: self.update_interval: float = update_interfal - self.profile_path = TemporaryDirectory() # cleaned up on exit by nodriver + self.profile_path = mkdtemp() # cleaned up on exit by nodriver self._loop = loop self._token_info: Optional[dict] = None self._ongoing_update: asyncio.Lock = asyncio.Lock() @@ -78,7 +78,7 @@ async def _update(self): logging.info(f'update started') self._extraction_done.clear() - browser = await nodriver.start(headless=False, user_data_dir=self.profile_path.name) + browser = await nodriver.start(headless=False, user_data_dir=self.profile_path) tab = browser.main_tab tab.add_handler(nodriver.cdp.network.RequestWillBeSent, self._send_handler) await tab.get('https://www.youtube.com/embed/jNQXAC9IVRw') From 06dcee42dc8d27e058623a5b1ca3eea00ea215b2 Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Tue, 17 Sep 2024 16:12:59 +0300 Subject: [PATCH 03/16] Handle possibility of "#movie_player" selector timing out It might happen if loaded page does not contain a video player or fails to load at all. Explicitly specify 10 seconds timeout for the selector, wrap it in a try-catch block, add separate error messages for the selector and handler timeouts. --- potoken.py | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/potoken.py b/potoken.py index b833f95..6384b3d 100644 --- a/potoken.py +++ b/potoken.py @@ -69,7 +69,7 @@ def _extract_token(request: nodriver.cdp.network.Request) -> Optional[dict]: } return token_info - async def _update(self): + async def _update(self) -> None: if self._ongoing_update.locked(): logging.debug('update is already in progress') return @@ -82,19 +82,33 @@ async def _update(self): tab = browser.main_tab tab.add_handler(nodriver.cdp.network.RequestWillBeSent, self._send_handler) await tab.get('https://www.youtube.com/embed/jNQXAC9IVRw') - player = await tab.select("#movie_player") - await player.click() - - try: - await asyncio.wait_for(self._extraction_done.wait(), timeout=30) - except asyncio.TimeoutError: - logging.warning(f'update failed') - else: - logging.info('update was succeessful') - + player_clicked = await self._click_on_player(tab) + if player_clicked: + await self._wait_for_handler() await tab.close() browser.stop() + @staticmethod + async def _click_on_player(tab) -> bool: + try: + player = await tab.select("#movie_player", 10) + except asyncio.TimeoutError: + logging.warning(f'update failed: unable to locate video player on the page') + return False + else: + await player.click() + return True + + async def _wait_for_handler(self) -> bool: + try: + await asyncio.wait_for(self._extraction_done.wait(), timeout=30) + except asyncio.TimeoutError: + logging.warning(f'update failed: timeout waiting for outgoing API request') + return False + else: + logging.info('update was succeessful') + return True + async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent): if not event.request.method == 'POST': return From c7dfbd10ec5c3bc76630a43870f923e63adef1a5 Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Tue, 17 Sep 2024 16:13:10 +0300 Subject: [PATCH 04/16] Add named logger in potoken.py, update usages It allows client code to control logging format and level when the module is used as a library and not a standalone script. --- potoken.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/potoken.py b/potoken.py index 6384b3d..3c7b6d6 100644 --- a/potoken.py +++ b/potoken.py @@ -10,6 +10,8 @@ import nodriver +logger = logging.getLogger('potoken') + class PotokenExtractor: @@ -34,22 +36,22 @@ async def run(self): while True: try: await asyncio.wait_for(self._update_requested.wait(), timeout=self.update_interval) - logging.debug('initiating force update') + logger.debug('initiating force update') except asyncio.TimeoutError: - logging.debug('initiating scheduled update') + logger.debug('initiating scheduled update') await self._update() self._update_requested.clear() def request_update(self) -> bool: """Request immediate update, return False if update request is already set""" if self._ongoing_update.locked(): - logging.debug('update process is already running') + logger.debug('update process is already running') return False if self._update_requested.is_set(): - logging.debug('force update has already been requested') + logger.debug('force update has already been requested') return False self._loop.call_soon_threadsafe(self._update_requested.set) - logging.debug('force update requested') + logger.debug('force update requested') return True @staticmethod @@ -60,7 +62,7 @@ def _extract_token(request: nodriver.cdp.network.Request) -> Optional[dict]: visitor_data = post_data_json['context']['client']['visitorData'] potoken = post_data_json['serviceIntegrityDimensions']['poToken'] except (json.JSONDecodeError, TypeError, KeyError) as e: - logging.warning(f'failed to extract token from request: {type(e)}, {e}') + logger.warning(f'failed to extract token from request: {type(e)}, {e}') return None token_info = { 'updated': int(time.time()), @@ -71,11 +73,11 @@ def _extract_token(request: nodriver.cdp.network.Request) -> Optional[dict]: async def _update(self) -> None: if self._ongoing_update.locked(): - logging.debug('update is already in progress') + logger.debug('update is already in progress') return async with self._ongoing_update: - logging.info(f'update started') + logger.info(f'update started') self._extraction_done.clear() browser = await nodriver.start(headless=False, user_data_dir=self.profile_path) @@ -93,7 +95,7 @@ async def _click_on_player(tab) -> bool: try: player = await tab.select("#movie_player", 10) except asyncio.TimeoutError: - logging.warning(f'update failed: unable to locate video player on the page') + logger.warning(f'update failed: unable to locate video player on the page') return False else: await player.click() @@ -103,10 +105,10 @@ async def _wait_for_handler(self) -> bool: try: await asyncio.wait_for(self._extraction_done.wait(), timeout=30) except asyncio.TimeoutError: - logging.warning(f'update failed: timeout waiting for outgoing API request') + logger.warning(f'update failed: timeout waiting for outgoing API request') return False else: - logging.info('update was succeessful') + logger.info('update was succeessful') return True async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent): @@ -117,7 +119,7 @@ async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent): token_info = self._extract_token(event.request) if token_info is None: return - logging.info(f'new token: {token_info}') + logger.info(f'new token: {token_info}') self._token_info = token_info self._extraction_done.set() @@ -179,7 +181,7 @@ def app(self, environ, start_response): return [page.encode('utf8')] def run(self): - logging.info(f'Starting web-server at {self.bind_address}:{self.port}') + logger.info(f'Starting web-server at {self.bind_address}:{self.port}') self._httpd = make_server(self.bind_address, self.port, self.app, ThreadingWSGIServer) with self._httpd: self._httpd.serve_forever() @@ -207,7 +209,7 @@ def main(update_interval, bind_address, port) -> None: # to ensure process exit code is 1 on error raise except (KeyboardInterrupt, asyncio.CancelledError): - logging.info('Stopping...') + logger.info('Stopping...') finally: potoken_server.stop() From 92191f4b0de98998a645a540cfe0971767606c7e Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Tue, 17 Sep 2024 16:13:19 +0300 Subject: [PATCH 05/16] Abort token update if it takes longer than ten minutes If connection to the launched browser gets interrupted or the browser itself exits abruptly before the video tab got loaded, the update procedure might hang up indefinitely. --- potoken.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/potoken.py b/potoken.py index 3c7b6d6..8542b3d 100644 --- a/potoken.py +++ b/potoken.py @@ -72,6 +72,12 @@ def _extract_token(request: nodriver.cdp.network.Request) -> Optional[dict]: return token_info async def _update(self) -> None: + try: + await asyncio.wait_for(self._perform_update(), timeout=600) + except asyncio.TimeoutError: + logger.error(f'update failed: hard limit timeout exceeded. Browser might be failing to start properly') + + async def _perform_update(self) -> None: if self._ongoing_update.locked(): logger.debug('update is already in progress') return From 14a340db70b1f2db36159f9c2917d469a595f695 Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Tue, 17 Sep 2024 16:13:22 +0300 Subject: [PATCH 06/16] Update index.py to use potoken.py code --- index.py | 60 +++++++++++++++++++++++++------------------------------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/index.py b/index.py index 64a61ed..0a9496f 100644 --- a/index.py +++ b/index.py @@ -1,37 +1,31 @@ -import asyncio -from nodriver import start, cdp, loop -import time -import json +import logging import sys -async def main(): - browser = await start(headless=False) - print("[INFO] launching browser.") - tab = browser.main_tab - tab.add_handler(cdp.network.RequestWillBeSent, send_handler) - page = await browser.get('https://www.youtube.com/embed/jNQXAC9IVRw') - await tab.wait(cdp.network.RequestWillBeSent) - print("[INFO] waiting 10 seconds for the page to fully load.") - await tab.sleep(10) - button_play = await tab.select("#movie_player") - await button_play.click() - await tab.wait(cdp.network.RequestWillBeSent) - print("[INFO] waiting additional 30 seconds for slower connections.") - await tab.sleep(30) - -async def send_handler(event: cdp.network.RequestWillBeSent): - if "/youtubei/v1/player" in event.request.url: - post_data = event.request.post_data - post_data_json = json.loads(post_data) - visitor_data = post_data_json["context"]["client"]["visitorData"] - po_token = post_data_json["serviceIntegrityDimensions"]["poToken"] - print("visitor_data: " + visitor_data) - print("po_token: " + po_token) - if len(po_token) < 160: - print("[WARNING] there is a high chance that the potoken generated won't work. please try again on another internet connection.") - sys.exit(0) - return +import nodriver -if __name__ == '__main__': +from potoken import PotokenExtractor, set_logging + + +async def main(loop): + set_logging(logging.WARNING) + logger = logging.getLogger('index') + + extractor = PotokenExtractor(loop) + token_info = await extractor.run_once() + if token_info is None: + logger.warning('failed to extract token') + sys.exit(1) + visitor_data = token_info['visitor_data'] + po_token = token_info['potoken'] - loop().run_until_complete(main()) \ No newline at end of file + print('visitor_data: ' + visitor_data) + print('po_token: ' + po_token) + if len(po_token) < 160: + logger.warning("there is a high chance that the potoken generated won't work. Please try again on another internet connection") + sys.exit(1) + sys.exit(0) + + +if __name__ == '__main__': + loop = nodriver.loop() + loop.run_until_complete(main(loop)) From b1868a9272c0a03049d5d0e5f738c968442da59c Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Tue, 17 Sep 2024 21:07:44 +0300 Subject: [PATCH 07/16] Set default token update interval to 1 hour --- potoken.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/potoken.py b/potoken.py index 8542b3d..4d14c82 100644 --- a/potoken.py +++ b/potoken.py @@ -240,7 +240,7 @@ def args_parse(): token regeneration by accessing http://127.0.0.1:8080/update ''' parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument('--update-interval', '-u', type=int, default=3600 * 12, + parser.add_argument('--update-interval', '-u', type=int, default=3600, help='How ofthen new token is generated, in seconds (default: %(default)s)') parser.add_argument('--port', '-p', type=int, default=8080, help='Port webserver is listening on (default: %(default)s)') From e20e86cd1072d75358f6fee2a579ff0b3950297a Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Wed, 18 Sep 2024 12:42:25 +0300 Subject: [PATCH 08/16] Change cold start error status code for the /token endpoint to 503 --- potoken.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/potoken.py b/potoken.py index 4d14c82..38d6573 100644 --- a/potoken.py +++ b/potoken.py @@ -146,7 +146,7 @@ def __init__(self, potoken_extractor: PotokenExtractor, port: int = 8080, bind_a def get_potoken(self) -> Tuple[str, list, str]: token = self._potoken_extractor.get() if token is None: - status = '502 Bad Gateway' + status = '503 Service Unavailable' headers = [("Content-Type", "text/plain")] page = 'Token has not yet been generated, try again later.' else: From 889f55f21bddca6fb8177ad2f1a919a396e91a0a Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Wed, 18 Sep 2024 12:51:42 +0300 Subject: [PATCH 09/16] Fix linter warnings Remove unnecessarily f-strings and double quotes, fix a few other issues. --- potoken.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/potoken.py b/potoken.py index 38d6573..f6dac24 100644 --- a/potoken.py +++ b/potoken.py @@ -15,8 +15,8 @@ class PotokenExtractor: - def __init__(self, loop: asyncio.AbstractEventLoop, update_interfal: float = 3600 * 12) -> None: - self.update_interval: float = update_interfal + def __init__(self, loop: asyncio.AbstractEventLoop, update_interval: float = 3600) -> None: + self.update_interval: float = update_interval self.profile_path = mkdtemp() # cleaned up on exit by nodriver self._loop = loop self._token_info: Optional[dict] = None @@ -75,7 +75,7 @@ async def _update(self) -> None: try: await asyncio.wait_for(self._perform_update(), timeout=600) except asyncio.TimeoutError: - logger.error(f'update failed: hard limit timeout exceeded. Browser might be failing to start properly') + logger.error('update failed: hard limit timeout exceeded. Browser might be failing to start properly') async def _perform_update(self) -> None: if self._ongoing_update.locked(): @@ -83,7 +83,7 @@ async def _perform_update(self) -> None: return async with self._ongoing_update: - logger.info(f'update started') + logger.info('update started') self._extraction_done.clear() browser = await nodriver.start(headless=False, user_data_dir=self.profile_path) @@ -99,9 +99,9 @@ async def _perform_update(self) -> None: @staticmethod async def _click_on_player(tab) -> bool: try: - player = await tab.select("#movie_player", 10) + player = await tab.select('#movie_player', 10) except asyncio.TimeoutError: - logger.warning(f'update failed: unable to locate video player on the page') + logger.warning('update failed: unable to locate video player on the page') return False else: await player.click() @@ -111,7 +111,7 @@ async def _wait_for_handler(self) -> bool: try: await asyncio.wait_for(self._extraction_done.wait(), timeout=30) except asyncio.TimeoutError: - logger.warning(f'update failed: timeout waiting for outgoing API request') + logger.warning('update failed: timeout waiting for outgoing API request') return False else: logger.info('update was succeessful') @@ -120,7 +120,7 @@ async def _wait_for_handler(self) -> bool: async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent): if not event.request.method == 'POST': return - if not '/youtubei/v1/player' in event.request.url: + if '/youtubei/v1/player' not in event.request.url: return token_info = self._extract_token(event.request) if token_info is None: @@ -147,17 +147,17 @@ def get_potoken(self) -> Tuple[str, list, str]: token = self._potoken_extractor.get() if token is None: status = '503 Service Unavailable' - headers = [("Content-Type", "text/plain")] + headers = [('Content-Type', 'text/plain')] page = 'Token has not yet been generated, try again later.' else: status = '200 OK' - headers = [("Content-Type", "application/json")] + headers = [('Content-Type', 'application/json')] page = json.dumps(token) return status, headers, page def request_update(self) -> Tuple[str, list, str]: status = '200 OK' - headers = [("Content-Type", "text/plain")] + headers = [('Content-Type', 'text/plain')] accepted = self._potoken_extractor.request_update() if accepted: @@ -170,7 +170,7 @@ def request_update(self) -> Tuple[str, list, str]: def get_route_handler(self, route: str) -> Callable[[], Tuple[str, list, str]]: handlers = { # handler is a function returning a tuple of status, headers, page text - '/404': lambda: ('404 Not Found', [("Content-Type", "text/plain")], 'Not Found'), + '/404': lambda: ('404 Not Found', [('Content-Type', 'text/plain')], 'Not Found'), '/': lambda: ('302 Found', [('Location', '/token')], '/token'), '/token': self.get_potoken, '/update': self.request_update @@ -201,7 +201,7 @@ def stop(self): def main(update_interval, bind_address, port) -> None: loop = nodriver.loop() - potoken_extractor = PotokenExtractor(loop, update_interfal=update_interval) + potoken_extractor = PotokenExtractor(loop, update_interval=update_interval) potoken_server = PotokenServer(potoken_extractor, port=port, bind_address=bind_address) extractor_task = loop.create_task(potoken_extractor.run()) From f7b9ba59f408d022e65a9ec8989cb9934668f614 Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Wed, 18 Sep 2024 13:33:24 +0300 Subject: [PATCH 10/16] Add missing typehints --- potoken.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/potoken.py b/potoken.py index f6dac24..7dd54ee 100644 --- a/potoken.py +++ b/potoken.py @@ -5,7 +5,7 @@ import time from socketserver import ThreadingMixIn from tempfile import mkdtemp -from typing import Callable, Optional, Tuple +from typing import Any, Callable, Dict, Optional, Tuple from wsgiref.simple_server import WSGIServer, make_server import nodriver @@ -31,7 +31,7 @@ async def run_once(self) -> Optional[dict]: await self._update() return self.get() - async def run(self): + async def run(self) -> None: await self._update() while True: try: @@ -97,7 +97,7 @@ async def _perform_update(self) -> None: browser.stop() @staticmethod - async def _click_on_player(tab) -> bool: + async def _click_on_player(tab: nodriver.Tab) -> bool: try: player = await tab.select('#movie_player', 10) except asyncio.TimeoutError: @@ -117,7 +117,7 @@ async def _wait_for_handler(self) -> bool: logger.info('update was succeessful') return True - async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent): + async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent) -> None: if not event.request.method == 'POST': return if '/youtubei/v1/player' not in event.request.url: @@ -132,12 +132,12 @@ async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent): class ThreadingWSGIServer(WSGIServer, ThreadingMixIn): """Thread per request HTTP server.""" - daemon_threads = True + daemon_threads: bool = True class PotokenServer: - def __init__(self, potoken_extractor: PotokenExtractor, port: int = 8080, bind_address: str = '0.0.0.0'): + def __init__(self, potoken_extractor: PotokenExtractor, port: int = 8080, bind_address: str = '0.0.0.0') -> None: self.port = port self.bind_address = bind_address self._potoken_extractor = potoken_extractor @@ -177,7 +177,7 @@ def get_route_handler(self, route: str) -> Callable[[], Tuple[str, list, str]]: } return handlers.get(route) or handlers['/404'] - def app(self, environ, start_response): + def app(self, environ: Dict[str, Any], start_response): route = environ['PATH_INFO'] handler = self.get_route_handler(route) @@ -186,19 +186,19 @@ def app(self, environ, start_response): start_response(status, headers) return [page.encode('utf8')] - def run(self): + def run(self) -> None: logger.info(f'Starting web-server at {self.bind_address}:{self.port}') self._httpd = make_server(self.bind_address, self.port, self.app, ThreadingWSGIServer) with self._httpd: self._httpd.serve_forever() - def stop(self): + def stop(self) -> None: if self._httpd is None: return self._httpd.shutdown() -def main(update_interval, bind_address, port) -> None: +def main(update_interval: int, bind_address: str, port: int) -> None: loop = nodriver.loop() potoken_extractor = PotokenExtractor(loop, update_interval=update_interval) @@ -220,7 +220,7 @@ def main(update_interval, bind_address, port) -> None: potoken_server.stop() -def set_logging(log_level=logging.DEBUG): +def set_logging(log_level: int = logging.DEBUG) -> None: log_format = '%(asctime)s.%(msecs)03d [%(name)s] [%(levelname)s] %(message)s' datefmt = '%Y/%m/%d %H:%M:%S' logging.basicConfig(level=log_level, format=log_format, datefmt=datefmt) @@ -230,7 +230,7 @@ def set_logging(log_level=logging.DEBUG): logging.getLogger('websockets').setLevel(logging.WARNING) -def args_parse(): +def args_parse() -> argparse.Namespace: description = ''' Retrieve potoken using Chromium runned by nodriver, serve it on a json endpoint From 07ab76b7945fd01bc79ae2e2b258e175ffd31470 Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Wed, 18 Sep 2024 14:16:37 +0300 Subject: [PATCH 11/16] Use dataclass to represent token_info --- index.py | 4 ++-- potoken.py | 36 +++++++++++++++++++++++++----------- 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/index.py b/index.py index 0a9496f..c989044 100644 --- a/index.py +++ b/index.py @@ -15,8 +15,8 @@ async def main(loop): if token_info is None: logger.warning('failed to extract token') sys.exit(1) - visitor_data = token_info['visitor_data'] - po_token = token_info['potoken'] + visitor_data = token_info.visitor_data + po_token = token_info.potoken print('visitor_data: ' + visitor_data) print('po_token: ' + po_token) diff --git a/potoken.py b/potoken.py index 7dd54ee..44717af 100644 --- a/potoken.py +++ b/potoken.py @@ -1,8 +1,10 @@ import argparse import asyncio +import dataclasses import json import logging import time +from dataclasses import dataclass from socketserver import ThreadingMixIn from tempfile import mkdtemp from typing import Any, Callable, Dict, Optional, Tuple @@ -13,21 +15,33 @@ logger = logging.getLogger('potoken') +@dataclass +class TokenInfo: + updated: int + potoken: str + visitor_data: str + + def to_json(self) -> str: + as_dict = dataclasses.asdict(self) + as_json = json.dumps(as_dict) + return as_json + + class PotokenExtractor: def __init__(self, loop: asyncio.AbstractEventLoop, update_interval: float = 3600) -> None: self.update_interval: float = update_interval self.profile_path = mkdtemp() # cleaned up on exit by nodriver self._loop = loop - self._token_info: Optional[dict] = None + self._token_info: Optional[TokenInfo] = None self._ongoing_update: asyncio.Lock = asyncio.Lock() self._extraction_done: asyncio.Event = asyncio.Event() self._update_requested: asyncio.Event = asyncio.Event() - def get(self) -> Optional[dict]: + def get(self) -> Optional[TokenInfo]: return self._token_info - async def run_once(self) -> Optional[dict]: + async def run_once(self) -> Optional[TokenInfo]: await self._update() return self.get() @@ -55,7 +69,7 @@ def request_update(self) -> bool: return True @staticmethod - def _extract_token(request: nodriver.cdp.network.Request) -> Optional[dict]: + def _extract_token(request: nodriver.cdp.network.Request) -> Optional[TokenInfo]: post_data = request.post_data try: post_data_json = json.loads(post_data) @@ -64,11 +78,11 @@ def _extract_token(request: nodriver.cdp.network.Request) -> Optional[dict]: except (json.JSONDecodeError, TypeError, KeyError) as e: logger.warning(f'failed to extract token from request: {type(e)}, {e}') return None - token_info = { - 'updated': int(time.time()), - 'potoken': potoken, - 'visitor_data': visitor_data - } + token_info = TokenInfo( + updated=int(time.time()), + potoken=potoken, + visitor_data=visitor_data + ) return token_info async def _update(self) -> None: @@ -125,7 +139,7 @@ async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent) -> token_info = self._extract_token(event.request) if token_info is None: return - logger.info(f'new token: {token_info}') + logger.info(f'new token: {token_info.to_json()}') self._token_info = token_info self._extraction_done.set() @@ -152,7 +166,7 @@ def get_potoken(self) -> Tuple[str, list, str]: else: status = '200 OK' headers = [('Content-Type', 'application/json')] - page = json.dumps(token) + page = token.to_json() return status, headers, page def request_update(self) -> Tuple[str, list, str]: From 167d5ea9ae0e7bde09590c491c9923554e1e331a Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Thu, 19 Sep 2024 01:56:54 +0300 Subject: [PATCH 12/16] Duplicate index.py functionality in potoken.py --- index.py | 31 ------------------------------- potoken.py | 42 ++++++++++++++++++++++++++++++++++-------- 2 files changed, 34 insertions(+), 39 deletions(-) delete mode 100644 index.py diff --git a/index.py b/index.py deleted file mode 100644 index c989044..0000000 --- a/index.py +++ /dev/null @@ -1,31 +0,0 @@ -import logging -import sys - -import nodriver - -from potoken import PotokenExtractor, set_logging - - -async def main(loop): - set_logging(logging.WARNING) - logger = logging.getLogger('index') - - extractor = PotokenExtractor(loop) - token_info = await extractor.run_once() - if token_info is None: - logger.warning('failed to extract token') - sys.exit(1) - visitor_data = token_info.visitor_data - po_token = token_info.potoken - - print('visitor_data: ' + visitor_data) - print('po_token: ' + po_token) - if len(po_token) < 160: - logger.warning("there is a high chance that the potoken generated won't work. Please try again on another internet connection") - sys.exit(1) - sys.exit(0) - - -if __name__ == '__main__': - loop = nodriver.loop() - loop.run_until_complete(main(loop)) diff --git a/potoken.py b/potoken.py index 44717af..38937e7 100644 --- a/potoken.py +++ b/potoken.py @@ -3,6 +3,7 @@ import dataclasses import json import logging +import sys import time from dataclasses import dataclass from socketserver import ThreadingMixIn @@ -212,18 +213,35 @@ def stop(self) -> None: self._httpd.shutdown() -def main(update_interval: int, bind_address: str, port: int) -> None: - loop = nodriver.loop() +def print_token_and_exit(token_info: Optional[TokenInfo]): + if token_info is None: + logger.warning('failed to extract token') + sys.exit(1) + visitor_data = token_info.visitor_data + po_token = token_info.potoken + + print('visitor_data: ' + visitor_data) + print('po_token: ' + po_token) + if len(po_token) < 160: + logger.warning("there is a high chance that the potoken generated won't work. Please try again on another internet connection") + sys.exit(1) + sys.exit(0) + + +async def run(loop: asyncio.AbstractEventLoop, oneshot: bool, + update_interval: int, bind_address: str, port: int) -> None: potoken_extractor = PotokenExtractor(loop, update_interval=update_interval) - potoken_server = PotokenServer(potoken_extractor, port=port, bind_address=bind_address) + if oneshot: + token = await potoken_extractor.run_once() + print_token_and_exit(token) extractor_task = loop.create_task(potoken_extractor.run()) + potoken_server = PotokenServer(potoken_extractor, port=port, bind_address=bind_address) server_task = loop.create_task(asyncio.to_thread(potoken_server.run)) try: - main_task = asyncio.gather(extractor_task, server_task) - loop.run_until_complete(main_task) + await asyncio.gather(extractor_task, server_task) except Exception: # exceptions raised by the tasks are intentionally propogated # to ensure process exit code is 1 on error @@ -254,6 +272,8 @@ def args_parse() -> argparse.Namespace: token regeneration by accessing http://127.0.0.1:8080/update ''' parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('-o', '--oneshot', action='store_true', default=False, + help='Do not start server. Generate token once, print it and exit') parser.add_argument('--update-interval', '-u', type=int, default=3600, help='How ofthen new token is generated, in seconds (default: %(default)s)') parser.add_argument('--port', '-p', type=int, default=8080, @@ -263,7 +283,13 @@ def args_parse() -> argparse.Namespace: return parser.parse_args() -if __name__ == '__main__': - set_logging() +def main() -> None: args = args_parse() - main(update_interval=args.update_interval, bind_address=args.bind, port=args.port) + set_logging(logging.WARNING if args.oneshot else logging.INFO) + loop = nodriver.loop() + main_task = run(loop, oneshot=args.oneshot, update_interval=args.update_interval, bind_address=args.bind, port=args.port) + loop.run_until_complete(main_task) + + +if __name__ == '__main__': + main() From 722fd476fe03f09f00c02fa7281d9cb62bb5746c Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Thu, 19 Sep 2024 02:36:22 +0300 Subject: [PATCH 13/16] Split potoken.py into multiple files --- README.md | 2 +- docker/scripts/startup.sh | 4 +- potoken-generator.py | 4 + potoken.py | 295 --------------------------------- potoken_generator/__init__.py | 0 potoken_generator/extractor.py | 141 ++++++++++++++++ potoken_generator/main.py | 89 ++++++++++ potoken_generator/server.py | 76 +++++++++ 8 files changed, 313 insertions(+), 298 deletions(-) create mode 100644 potoken-generator.py delete mode 100644 potoken.py create mode 100644 potoken_generator/__init__.py create mode 100644 potoken_generator/extractor.py create mode 100644 potoken_generator/main.py create mode 100644 potoken_generator/server.py diff --git a/README.md b/README.md index 986df16..a84f8c4 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ These identity tokens (po_token and visitor_data) generated using this tool will 2. Create a new virtualenv: `virtualenv venv` 3. Activate the virtualenv: `source venv/bin/activate` 4. Install the dependencies: `pip install -r requirements.txt` -5. Run the script: `python index.py` +5. Run the script: `python potoken-generator.py --oneshot` 6. Copy paste the values of these the two parameters (po_token and visitor_data) in config.yaml ``` po_token: XXX diff --git a/docker/scripts/startup.sh b/docker/scripts/startup.sh index 9ba57f1..8332aa2 100755 --- a/docker/scripts/startup.sh +++ b/docker/scripts/startup.sh @@ -10,5 +10,5 @@ sleep 2 echo "[INFO] launching chromium instance" -# Run python script on display 99 -DISPLAY=:99 python index.py +# Run python script on display 0 +DISPLAY=:99 python potoken-generator.py --oneshot diff --git a/potoken-generator.py b/potoken-generator.py new file mode 100644 index 0000000..9f061f7 --- /dev/null +++ b/potoken-generator.py @@ -0,0 +1,4 @@ +import potoken_generator.main + +if __name__ == '__main__': + potoken_generator.main.main() diff --git a/potoken.py b/potoken.py deleted file mode 100644 index 38937e7..0000000 --- a/potoken.py +++ /dev/null @@ -1,295 +0,0 @@ -import argparse -import asyncio -import dataclasses -import json -import logging -import sys -import time -from dataclasses import dataclass -from socketserver import ThreadingMixIn -from tempfile import mkdtemp -from typing import Any, Callable, Dict, Optional, Tuple -from wsgiref.simple_server import WSGIServer, make_server - -import nodriver - -logger = logging.getLogger('potoken') - - -@dataclass -class TokenInfo: - updated: int - potoken: str - visitor_data: str - - def to_json(self) -> str: - as_dict = dataclasses.asdict(self) - as_json = json.dumps(as_dict) - return as_json - - -class PotokenExtractor: - - def __init__(self, loop: asyncio.AbstractEventLoop, update_interval: float = 3600) -> None: - self.update_interval: float = update_interval - self.profile_path = mkdtemp() # cleaned up on exit by nodriver - self._loop = loop - self._token_info: Optional[TokenInfo] = None - self._ongoing_update: asyncio.Lock = asyncio.Lock() - self._extraction_done: asyncio.Event = asyncio.Event() - self._update_requested: asyncio.Event = asyncio.Event() - - def get(self) -> Optional[TokenInfo]: - return self._token_info - - async def run_once(self) -> Optional[TokenInfo]: - await self._update() - return self.get() - - async def run(self) -> None: - await self._update() - while True: - try: - await asyncio.wait_for(self._update_requested.wait(), timeout=self.update_interval) - logger.debug('initiating force update') - except asyncio.TimeoutError: - logger.debug('initiating scheduled update') - await self._update() - self._update_requested.clear() - - def request_update(self) -> bool: - """Request immediate update, return False if update request is already set""" - if self._ongoing_update.locked(): - logger.debug('update process is already running') - return False - if self._update_requested.is_set(): - logger.debug('force update has already been requested') - return False - self._loop.call_soon_threadsafe(self._update_requested.set) - logger.debug('force update requested') - return True - - @staticmethod - def _extract_token(request: nodriver.cdp.network.Request) -> Optional[TokenInfo]: - post_data = request.post_data - try: - post_data_json = json.loads(post_data) - visitor_data = post_data_json['context']['client']['visitorData'] - potoken = post_data_json['serviceIntegrityDimensions']['poToken'] - except (json.JSONDecodeError, TypeError, KeyError) as e: - logger.warning(f'failed to extract token from request: {type(e)}, {e}') - return None - token_info = TokenInfo( - updated=int(time.time()), - potoken=potoken, - visitor_data=visitor_data - ) - return token_info - - async def _update(self) -> None: - try: - await asyncio.wait_for(self._perform_update(), timeout=600) - except asyncio.TimeoutError: - logger.error('update failed: hard limit timeout exceeded. Browser might be failing to start properly') - - async def _perform_update(self) -> None: - if self._ongoing_update.locked(): - logger.debug('update is already in progress') - return - - async with self._ongoing_update: - logger.info('update started') - self._extraction_done.clear() - - browser = await nodriver.start(headless=False, user_data_dir=self.profile_path) - tab = browser.main_tab - tab.add_handler(nodriver.cdp.network.RequestWillBeSent, self._send_handler) - await tab.get('https://www.youtube.com/embed/jNQXAC9IVRw') - player_clicked = await self._click_on_player(tab) - if player_clicked: - await self._wait_for_handler() - await tab.close() - browser.stop() - - @staticmethod - async def _click_on_player(tab: nodriver.Tab) -> bool: - try: - player = await tab.select('#movie_player', 10) - except asyncio.TimeoutError: - logger.warning('update failed: unable to locate video player on the page') - return False - else: - await player.click() - return True - - async def _wait_for_handler(self) -> bool: - try: - await asyncio.wait_for(self._extraction_done.wait(), timeout=30) - except asyncio.TimeoutError: - logger.warning('update failed: timeout waiting for outgoing API request') - return False - else: - logger.info('update was succeessful') - return True - - async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent) -> None: - if not event.request.method == 'POST': - return - if '/youtubei/v1/player' not in event.request.url: - return - token_info = self._extract_token(event.request) - if token_info is None: - return - logger.info(f'new token: {token_info.to_json()}') - self._token_info = token_info - self._extraction_done.set() - - -class ThreadingWSGIServer(WSGIServer, ThreadingMixIn): - """Thread per request HTTP server.""" - daemon_threads: bool = True - - -class PotokenServer: - - def __init__(self, potoken_extractor: PotokenExtractor, port: int = 8080, bind_address: str = '0.0.0.0') -> None: - self.port = port - self.bind_address = bind_address - self._potoken_extractor = potoken_extractor - self._httpd: Optional[ThreadingWSGIServer] = None - - def get_potoken(self) -> Tuple[str, list, str]: - token = self._potoken_extractor.get() - if token is None: - status = '503 Service Unavailable' - headers = [('Content-Type', 'text/plain')] - page = 'Token has not yet been generated, try again later.' - else: - status = '200 OK' - headers = [('Content-Type', 'application/json')] - page = token.to_json() - return status, headers, page - - def request_update(self) -> Tuple[str, list, str]: - status = '200 OK' - headers = [('Content-Type', 'text/plain')] - - accepted = self._potoken_extractor.request_update() - if accepted: - page = 'Update request accepted, new token will be generated soon.' - else: - page = 'Update has already been requested, new token will be generated soon.' - - return status, headers, page - - def get_route_handler(self, route: str) -> Callable[[], Tuple[str, list, str]]: - handlers = { - # handler is a function returning a tuple of status, headers, page text - '/404': lambda: ('404 Not Found', [('Content-Type', 'text/plain')], 'Not Found'), - '/': lambda: ('302 Found', [('Location', '/token')], '/token'), - '/token': self.get_potoken, - '/update': self.request_update - } - return handlers.get(route) or handlers['/404'] - - def app(self, environ: Dict[str, Any], start_response): - route = environ['PATH_INFO'] - - handler = self.get_route_handler(route) - status, headers, page = handler() - - start_response(status, headers) - return [page.encode('utf8')] - - def run(self) -> None: - logger.info(f'Starting web-server at {self.bind_address}:{self.port}') - self._httpd = make_server(self.bind_address, self.port, self.app, ThreadingWSGIServer) - with self._httpd: - self._httpd.serve_forever() - - def stop(self) -> None: - if self._httpd is None: - return - self._httpd.shutdown() - - -def print_token_and_exit(token_info: Optional[TokenInfo]): - if token_info is None: - logger.warning('failed to extract token') - sys.exit(1) - visitor_data = token_info.visitor_data - po_token = token_info.potoken - - print('visitor_data: ' + visitor_data) - print('po_token: ' + po_token) - if len(po_token) < 160: - logger.warning("there is a high chance that the potoken generated won't work. Please try again on another internet connection") - sys.exit(1) - sys.exit(0) - - -async def run(loop: asyncio.AbstractEventLoop, oneshot: bool, - update_interval: int, bind_address: str, port: int) -> None: - - potoken_extractor = PotokenExtractor(loop, update_interval=update_interval) - if oneshot: - token = await potoken_extractor.run_once() - print_token_and_exit(token) - - extractor_task = loop.create_task(potoken_extractor.run()) - potoken_server = PotokenServer(potoken_extractor, port=port, bind_address=bind_address) - server_task = loop.create_task(asyncio.to_thread(potoken_server.run)) - - try: - await asyncio.gather(extractor_task, server_task) - except Exception: - # exceptions raised by the tasks are intentionally propogated - # to ensure process exit code is 1 on error - raise - except (KeyboardInterrupt, asyncio.CancelledError): - logger.info('Stopping...') - finally: - potoken_server.stop() - - -def set_logging(log_level: int = logging.DEBUG) -> None: - log_format = '%(asctime)s.%(msecs)03d [%(name)s] [%(levelname)s] %(message)s' - datefmt = '%Y/%m/%d %H:%M:%S' - logging.basicConfig(level=log_level, format=log_format, datefmt=datefmt) - logging.getLogger('asyncio').setLevel(logging.INFO) - logging.getLogger('nodriver').setLevel(logging.WARNING) - logging.getLogger('uc').setLevel(logging.WARNING) - logging.getLogger('websockets').setLevel(logging.WARNING) - - -def args_parse() -> argparse.Namespace: - description = ''' -Retrieve potoken using Chromium runned by nodriver, serve it on a json endpoint - - Token is generated on startup, and then every UPDATE_INTERVAL seconds. - With web-server running on default port, the token is available on the - http://127.0.0.1:8080/token endpoint. It is possible to request immediate - token regeneration by accessing http://127.0.0.1:8080/update - ''' - parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawDescriptionHelpFormatter) - parser.add_argument('-o', '--oneshot', action='store_true', default=False, - help='Do not start server. Generate token once, print it and exit') - parser.add_argument('--update-interval', '-u', type=int, default=3600, - help='How ofthen new token is generated, in seconds (default: %(default)s)') - parser.add_argument('--port', '-p', type=int, default=8080, - help='Port webserver is listening on (default: %(default)s)') - parser.add_argument('--bind', '-b', default='0.0.0.0', - help='Address webserver binds to (default: %(default)s)') - return parser.parse_args() - - -def main() -> None: - args = args_parse() - set_logging(logging.WARNING if args.oneshot else logging.INFO) - loop = nodriver.loop() - main_task = run(loop, oneshot=args.oneshot, update_interval=args.update_interval, bind_address=args.bind, port=args.port) - loop.run_until_complete(main_task) - - -if __name__ == '__main__': - main() diff --git a/potoken_generator/__init__.py b/potoken_generator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/potoken_generator/extractor.py b/potoken_generator/extractor.py new file mode 100644 index 0000000..7563cd8 --- /dev/null +++ b/potoken_generator/extractor.py @@ -0,0 +1,141 @@ +import asyncio +import dataclasses +import json +import logging +import time +from dataclasses import dataclass +from tempfile import mkdtemp +from typing import Optional + +import nodriver + +logger = logging.getLogger('extractor') + + +@dataclass +class TokenInfo: + updated: int + potoken: str + visitor_data: str + + def to_json(self) -> str: + as_dict = dataclasses.asdict(self) + as_json = json.dumps(as_dict) + return as_json + + +class PotokenExtractor: + + def __init__(self, loop: asyncio.AbstractEventLoop, update_interval: float = 3600) -> None: + self.update_interval: float = update_interval + self.profile_path = mkdtemp() # cleaned up on exit by nodriver + self._loop = loop + self._token_info: Optional[TokenInfo] = None + self._ongoing_update: asyncio.Lock = asyncio.Lock() + self._extraction_done: asyncio.Event = asyncio.Event() + self._update_requested: asyncio.Event = asyncio.Event() + + def get(self) -> Optional[TokenInfo]: + return self._token_info + + async def run_once(self) -> Optional[TokenInfo]: + await self._update() + return self.get() + + async def run(self) -> None: + await self._update() + while True: + try: + await asyncio.wait_for(self._update_requested.wait(), timeout=self.update_interval) + logger.debug('initiating force update') + except asyncio.TimeoutError: + logger.debug('initiating scheduled update') + await self._update() + self._update_requested.clear() + + def request_update(self) -> bool: + """Request immediate update, return False if update request is already set""" + if self._ongoing_update.locked(): + logger.debug('update process is already running') + return False + if self._update_requested.is_set(): + logger.debug('force update has already been requested') + return False + self._loop.call_soon_threadsafe(self._update_requested.set) + logger.debug('force update requested') + return True + + @staticmethod + def _extract_token(request: nodriver.cdp.network.Request) -> Optional[TokenInfo]: + post_data = request.post_data + try: + post_data_json = json.loads(post_data) + visitor_data = post_data_json['context']['client']['visitorData'] + potoken = post_data_json['serviceIntegrityDimensions']['poToken'] + except (json.JSONDecodeError, TypeError, KeyError) as e: + logger.warning(f'failed to extract token from request: {type(e)}, {e}') + return None + token_info = TokenInfo( + updated=int(time.time()), + potoken=potoken, + visitor_data=visitor_data + ) + return token_info + + async def _update(self) -> None: + try: + await asyncio.wait_for(self._perform_update(), timeout=600) + except asyncio.TimeoutError: + logger.error('update failed: hard limit timeout exceeded. Browser might be failing to start properly') + + async def _perform_update(self) -> None: + if self._ongoing_update.locked(): + logger.debug('update is already in progress') + return + + async with self._ongoing_update: + logger.info('update started') + self._extraction_done.clear() + + browser = await nodriver.start(headless=False, user_data_dir=self.profile_path) + tab = browser.main_tab + tab.add_handler(nodriver.cdp.network.RequestWillBeSent, self._send_handler) + await tab.get('https://www.youtube.com/embed/jNQXAC9IVRw') + player_clicked = await self._click_on_player(tab) + if player_clicked: + await self._wait_for_handler() + await tab.close() + browser.stop() + + @staticmethod + async def _click_on_player(tab: nodriver.Tab) -> bool: + try: + player = await tab.select('#movie_player', 10) + except asyncio.TimeoutError: + logger.warning('update failed: unable to locate video player on the page') + return False + else: + await player.click() + return True + + async def _wait_for_handler(self) -> bool: + try: + await asyncio.wait_for(self._extraction_done.wait(), timeout=30) + except asyncio.TimeoutError: + logger.warning('update failed: timeout waiting for outgoing API request') + return False + else: + logger.info('update was succeessful') + return True + + async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent) -> None: + if not event.request.method == 'POST': + return + if '/youtubei/v1/player' not in event.request.url: + return + token_info = self._extract_token(event.request) + if token_info is None: + return + logger.info(f'new token: {token_info.to_json()}') + self._token_info = token_info + self._extraction_done.set() diff --git a/potoken_generator/main.py b/potoken_generator/main.py new file mode 100644 index 0000000..2838190 --- /dev/null +++ b/potoken_generator/main.py @@ -0,0 +1,89 @@ +import argparse +import asyncio +import logging +import sys +from typing import Optional + +import nodriver + +from potoken_generator.extractor import PotokenExtractor, TokenInfo +from potoken_generator.server import PotokenServer + +logger = logging.getLogger('potoken') + + +def print_token_and_exit(token_info: Optional[TokenInfo]): + if token_info is None: + logger.warning('failed to extract token') + sys.exit(1) + visitor_data = token_info.visitor_data + po_token = token_info.potoken + + print('visitor_data: ' + visitor_data) + print('po_token: ' + po_token) + if len(po_token) < 160: + logger.warning("there is a high chance that the potoken generated won't work. Please try again on another internet connection") + sys.exit(1) + sys.exit(0) + + +async def run(loop: asyncio.AbstractEventLoop, oneshot: bool, + update_interval: int, bind_address: str, port: int) -> None: + potoken_extractor = PotokenExtractor(loop, update_interval=update_interval) + if oneshot: + token = await potoken_extractor.run_once() + print_token_and_exit(token) + + extractor_task = loop.create_task(potoken_extractor.run()) + potoken_server = PotokenServer(potoken_extractor, port=port, bind_address=bind_address) + server_task = loop.create_task(asyncio.to_thread(potoken_server.run)) + + try: + await asyncio.gather(extractor_task, server_task) + except Exception: + # exceptions raised by the tasks are intentionally propogated + # to ensure process exit code is 1 on error + raise + except (KeyboardInterrupt, asyncio.CancelledError): + logger.info('Stopping...') + finally: + potoken_server.stop() + + +def set_logging(log_level: int = logging.DEBUG) -> None: + log_format = '%(asctime)s.%(msecs)03d [%(name)s] [%(levelname)s] %(message)s' + datefmt = '%Y/%m/%d %H:%M:%S' + logging.basicConfig(level=log_level, format=log_format, datefmt=datefmt) + logging.getLogger('asyncio').setLevel(logging.INFO) + logging.getLogger('nodriver').setLevel(logging.WARNING) + logging.getLogger('uc').setLevel(logging.WARNING) + logging.getLogger('websockets').setLevel(logging.WARNING) + + +def args_parse() -> argparse.Namespace: + description = ''' +Retrieve potoken using Chromium runned by nodriver, serve it on a json endpoint + + Token is generated on startup, and then every UPDATE_INTERVAL seconds. + With web-server running on default port, the token is available on the + http://127.0.0.1:8080/token endpoint. It is possible to request immediate + token regeneration by accessing http://127.0.0.1:8080/update + ''' + parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawDescriptionHelpFormatter) + parser.add_argument('-o', '--oneshot', action='store_true', default=False, + help='Do not start server. Generate token once, print it and exit') + parser.add_argument('--update-interval', '-u', type=int, default=3600, + help='How ofthen new token is generated, in seconds (default: %(default)s)') + parser.add_argument('--port', '-p', type=int, default=8080, + help='Port webserver is listening on (default: %(default)s)') + parser.add_argument('--bind', '-b', default='0.0.0.0', + help='Address webserver binds to (default: %(default)s)') + return parser.parse_args() + + +def main() -> None: + args = args_parse() + set_logging(logging.WARNING if args.oneshot else logging.INFO) + loop = nodriver.loop() + main_task = run(loop, oneshot=args.oneshot, update_interval=args.update_interval, bind_address=args.bind, port=args.port) + loop.run_until_complete(main_task) diff --git a/potoken_generator/server.py b/potoken_generator/server.py new file mode 100644 index 0000000..6673322 --- /dev/null +++ b/potoken_generator/server.py @@ -0,0 +1,76 @@ +import logging +from socketserver import ThreadingMixIn +from typing import Any, Callable, Dict, Optional, Tuple +from wsgiref.simple_server import WSGIServer, make_server + +from potoken_generator.extractor import PotokenExtractor + +logger = logging.getLogger('server') + + +class ThreadingWSGIServer(WSGIServer, ThreadingMixIn): + """Thread per request HTTP server.""" + daemon_threads: bool = True + + +class PotokenServer: + + def __init__(self, potoken_extractor: PotokenExtractor, port: int = 8080, bind_address: str = '0.0.0.0') -> None: + self.port = port + self.bind_address = bind_address + self._potoken_extractor = potoken_extractor + self._httpd: Optional[ThreadingWSGIServer] = None + + def get_potoken(self) -> Tuple[str, list, str]: + token = self._potoken_extractor.get() + if token is None: + status = '503 Service Unavailable' + headers = [('Content-Type', 'text/plain')] + page = 'Token has not yet been generated, try again later.' + else: + status = '200 OK' + headers = [('Content-Type', 'application/json')] + page = token.to_json() + return status, headers, page + + def request_update(self) -> Tuple[str, list, str]: + status = '200 OK' + headers = [('Content-Type', 'text/plain')] + + accepted = self._potoken_extractor.request_update() + if accepted: + page = 'Update request accepted, new token will be generated soon.' + else: + page = 'Update has already been requested, new token will be generated soon.' + + return status, headers, page + + def get_route_handler(self, route: str) -> Callable[[], Tuple[str, list, str]]: + handlers = { + # handler is a function returning a tuple of status, headers, page text + '/404': lambda: ('404 Not Found', [('Content-Type', 'text/plain')], 'Not Found'), + '/': lambda: ('302 Found', [('Location', '/token')], '/token'), + '/token': self.get_potoken, + '/update': self.request_update + } + return handlers.get(route) or handlers['/404'] + + def app(self, environ: Dict[str, Any], start_response): + route = environ['PATH_INFO'] + + handler = self.get_route_handler(route) + status, headers, page = handler() + + start_response(status, headers) + return [page.encode('utf8')] + + def run(self) -> None: + logger.info(f'Starting web-server at {self.bind_address}:{self.port}') + self._httpd = make_server(self.bind_address, self.port, self.app, ThreadingWSGIServer) + with self._httpd: + self._httpd.serve_forever() + + def stop(self) -> None: + if self._httpd is None: + return + self._httpd.shutdown() From 3b3a5c4b41a60d7c028d5314dbf5aaebc19253f3 Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Thu, 19 Sep 2024 02:59:52 +0300 Subject: [PATCH 14/16] Run first extraction before starting webserver It leaves less time for client to hit 503 error on startup, but more importantly, it deals with race condition in startup sequence: if extractor_task gets interrupted after webserver startup was initiated but before make_server() returned, then potoken_server.stop() becomes a no-op, leaving background task with webserver to run indefinitely. --- potoken_generator/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/potoken_generator/main.py b/potoken_generator/main.py index 2838190..d9859cc 100644 --- a/potoken_generator/main.py +++ b/potoken_generator/main.py @@ -30,8 +30,8 @@ def print_token_and_exit(token_info: Optional[TokenInfo]): async def run(loop: asyncio.AbstractEventLoop, oneshot: bool, update_interval: int, bind_address: str, port: int) -> None: potoken_extractor = PotokenExtractor(loop, update_interval=update_interval) + token = await potoken_extractor.run_once() if oneshot: - token = await potoken_extractor.run_once() print_token_and_exit(token) extractor_task = loop.create_task(potoken_extractor.run()) From d1560c5bf963dac756c55e89567010c9b9932eb5 Mon Sep 17 00:00:00 2001 From: 15532th <92187332+15532th@users.noreply.github.com> Date: Thu, 19 Sep 2024 03:26:38 +0300 Subject: [PATCH 15/16] Add a command line argument allowing to specify the path to a Chromium binary --- potoken_generator/extractor.py | 15 ++++++++++++--- potoken_generator/main.py | 15 ++++++++++++--- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/potoken_generator/extractor.py b/potoken_generator/extractor.py index 7563cd8..7e51e0a 100644 --- a/potoken_generator/extractor.py +++ b/potoken_generator/extractor.py @@ -4,6 +4,7 @@ import logging import time from dataclasses import dataclass +from pathlib import Path from tempfile import mkdtemp from typing import Optional @@ -26,8 +27,11 @@ def to_json(self) -> str: class PotokenExtractor: - def __init__(self, loop: asyncio.AbstractEventLoop, update_interval: float = 3600) -> None: + def __init__(self, loop: asyncio.AbstractEventLoop, + update_interval: float = 3600, + browser_path: Optional[Path] = None) -> None: self.update_interval: float = update_interval + self.browser_path: Optional[Path] = browser_path self.profile_path = mkdtemp() # cleaned up on exit by nodriver self._loop = loop self._token_info: Optional[TokenInfo] = None @@ -96,8 +100,13 @@ async def _perform_update(self) -> None: async with self._ongoing_update: logger.info('update started') self._extraction_done.clear() - - browser = await nodriver.start(headless=False, user_data_dir=self.profile_path) + try: + browser = await nodriver.start(headless=False, + browser_executable_path=self.browser_path, + user_data_dir=self.profile_path) + except FileNotFoundError as e: + msg = "could not find Chromium. Make sure it's installed or provide direct path to the executable" + raise FileNotFoundError(msg) from e tab = browser.main_tab tab.add_handler(nodriver.cdp.network.RequestWillBeSent, self._send_handler) await tab.get('https://www.youtube.com/embed/jNQXAC9IVRw') diff --git a/potoken_generator/main.py b/potoken_generator/main.py index d9859cc..0f88a81 100644 --- a/potoken_generator/main.py +++ b/potoken_generator/main.py @@ -2,6 +2,7 @@ import asyncio import logging import sys +from pathlib import Path from typing import Optional import nodriver @@ -28,8 +29,9 @@ def print_token_and_exit(token_info: Optional[TokenInfo]): async def run(loop: asyncio.AbstractEventLoop, oneshot: bool, - update_interval: int, bind_address: str, port: int) -> None: - potoken_extractor = PotokenExtractor(loop, update_interval=update_interval) + update_interval: int, bind_address: str, port: int, + browser_path: Optional[Path] = None) -> None: + potoken_extractor = PotokenExtractor(loop, update_interval=update_interval, browser_path=browser_path) token = await potoken_extractor.run_once() if oneshot: print_token_and_exit(token) @@ -78,6 +80,8 @@ def args_parse() -> argparse.Namespace: help='Port webserver is listening on (default: %(default)s)') parser.add_argument('--bind', '-b', default='0.0.0.0', help='Address webserver binds to (default: %(default)s)') + parser.add_argument('--chrome-path', '-c', type=Path, default=None, + help='Path to the Chromiun executable') return parser.parse_args() @@ -85,5 +89,10 @@ def main() -> None: args = args_parse() set_logging(logging.WARNING if args.oneshot else logging.INFO) loop = nodriver.loop() - main_task = run(loop, oneshot=args.oneshot, update_interval=args.update_interval, bind_address=args.bind, port=args.port) + main_task = run(loop, oneshot=args.oneshot, + update_interval=args.update_interval, + bind_address=args.bind, + port=args.port, + browser_path=args.chrome_path + ) loop.run_until_complete(main_task) From 9edde11f515edb64d324fd8fffa2673ea36249e9 Mon Sep 17 00:00:00 2001 From: Emilien <4016501+unixfox@users.noreply.github.com> Date: Sun, 17 Nov 2024 15:50:52 +0100 Subject: [PATCH 16/16] bind webserver localhost + specific Dockerfile webserver + reformat doc --- Dockerfile.webserver | 3 ++ README.md | 47 +++++++++++++++++++++-------- docker/scripts/startup-webserver.sh | 14 +++++++++ potoken_generator/main.py | 2 +- 4 files changed, 52 insertions(+), 14 deletions(-) create mode 100644 Dockerfile.webserver create mode 100755 docker/scripts/startup-webserver.sh diff --git a/Dockerfile.webserver b/Dockerfile.webserver new file mode 100644 index 0000000..b8ebb83 --- /dev/null +++ b/Dockerfile.webserver @@ -0,0 +1,3 @@ +FROM quay.io/invidious/youtube-trusted-session-generator:latest + +COPY docker/scripts/startup-webserver.sh ./ diff --git a/README.md b/README.md index a84f8c4..fea4caa 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ ## Description -This script will output two parameters: po_token and visitor_data. Needed for passing YouTube checks in Invidious. +This script will output two parameters: po_token and visitor_data. Needed for passing YouTube checks in Invidious or the program that use the po_token functionality. ## What's po_token @@ -15,7 +15,18 @@ These identity tokens (po_token and visitor_data) generated using this tool will - You have to run this command on the same public IP address as the one blocked by YouTube. Not necessarily the same machine, just the same public IP address. Subsequent usage of this same token will work on the same IP range or even the same ASN. The point is to generate this token on a blocked IP as "unblocked" IP addresses seems to not generate a token valid for passing the checks on a blocked IP. -## Tutorial without Docker +## Tutorials for "oneshot" command: run the program and get the po_token and visitor_data values + +### Tutorial with Docker +1. Run the script: `docker run quay.io/invidious/youtube-trusted-session-generator` +2. Copy paste the values of these the two parameters (po_token and visitor_data) in config.yaml + ``` + po_token: XXX + visitor_data: XXX + ``` +3. Restart Invidious or the program that use the po_token functionality. + +### Tutorial without Docker 1. Install Chromium or Google Chrome. 2. Create a new virtualenv: `virtualenv venv` 3. Activate the virtualenv: `source venv/bin/activate` @@ -26,17 +37,27 @@ These identity tokens (po_token and visitor_data) generated using this tool will po_token: XXX visitor_data: XXX ``` -7. Restart Invidious. +7. Restart Invidious or the program that use the po_token functionality. -## Tutorial with Docker -1. Run the script: `docker run quay.io/invidious/youtube-trusted-session-generator` -2. Copy paste the values of these the two parameters (po_token and visitor_data) in config.yaml - ``` - po_token: XXX - visitor_data: XXX - ``` -3. Restart Invidious. -## Why running as root for Docker? +### Why running as root for Docker? + +In "headless: false", Chromium does not support sanboxing when it is not ran by root user. + +## Tutorials for "always running" program: Get po_token on demand using HTTP. + +### Tutorial with Docker +Run the program: `docker run -p 8080:8080 quay.io/invidious/youtube-trusted-session-generator:webserver` + +### Tutorial without Docker +1. Install Chromium or Google Chrome. +2. Create a new virtualenv: `virtualenv venv` +3. Activate the virtualenv: `source venv/bin/activate` +4. Install the dependencies: `pip install -r requirements.txt` +5. Run the program: `python potoken-generator.py` + +### Usage of the HTTP API + +Send your requests to http://localhost:8080/token in order to obtain your po_token. -In "headless: false", Chromium does not support sanboxing when it is not ran by root user. \ No newline at end of file +You can also force refresh the po_token in the cache by sending a request to http://localhost:8080/update. \ No newline at end of file diff --git a/docker/scripts/startup-webserver.sh b/docker/scripts/startup-webserver.sh new file mode 100755 index 0000000..f692d17 --- /dev/null +++ b/docker/scripts/startup-webserver.sh @@ -0,0 +1,14 @@ +#!/bin/sh + +echo "[INFO] internally launching GUI (X11 environment)" + +XVFB_WHD=${XVFB_WHD:-1280x720x16} + +echo "[INFO] starting Xvfb" +Xvfb :99 -ac -screen 0 $XVFB_WHD -nolisten tcp > /dev/null 2>&1 & +sleep 2 + +echo "[INFO] launching chromium instance" + +# Run python script on display 0 +DISPLAY=:99 python potoken-generator.py --bind 0.0.0.0 diff --git a/potoken_generator/main.py b/potoken_generator/main.py index 0f88a81..c25e8ba 100644 --- a/potoken_generator/main.py +++ b/potoken_generator/main.py @@ -78,7 +78,7 @@ def args_parse() -> argparse.Namespace: help='How ofthen new token is generated, in seconds (default: %(default)s)') parser.add_argument('--port', '-p', type=int, default=8080, help='Port webserver is listening on (default: %(default)s)') - parser.add_argument('--bind', '-b', default='0.0.0.0', + parser.add_argument('--bind', '-b', default='127.0.0.1', help='Address webserver binds to (default: %(default)s)') parser.add_argument('--chrome-path', '-c', type=Path, default=None, help='Path to the Chromiun executable')