-
Notifications
You must be signed in to change notification settings - Fork 17
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
10 changed files
with
382 additions
and
53 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
FROM quay.io/invidious/youtube-trusted-session-generator:latest | ||
|
||
COPY docker/scripts/startup-webserver.sh ./ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
#!/bin/sh | ||
|
||
echo "[INFO] internally launching GUI (X11 environment)" | ||
|
||
XVFB_WHD=${XVFB_WHD:-1280x720x16} | ||
|
||
echo "[INFO] starting Xvfb" | ||
Xvfb :99 -ac -screen 0 $XVFB_WHD -nolisten tcp > /dev/null 2>&1 & | ||
sleep 2 | ||
|
||
echo "[INFO] launching chromium instance" | ||
|
||
# Run python script on display 0 | ||
DISPLAY=:99 python potoken-generator.py --bind 0.0.0.0 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,4 @@ | ||
import potoken_generator.main | ||
|
||
if __name__ == '__main__': | ||
potoken_generator.main.main() |
Empty file.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,150 @@ | ||
import asyncio | ||
import dataclasses | ||
import json | ||
import logging | ||
import time | ||
from dataclasses import dataclass | ||
from pathlib import Path | ||
from tempfile import mkdtemp | ||
from typing import Optional | ||
|
||
import nodriver | ||
|
||
logger = logging.getLogger('extractor') | ||
|
||
|
||
@dataclass | ||
class TokenInfo: | ||
updated: int | ||
potoken: str | ||
visitor_data: str | ||
|
||
def to_json(self) -> str: | ||
as_dict = dataclasses.asdict(self) | ||
as_json = json.dumps(as_dict) | ||
return as_json | ||
|
||
|
||
class PotokenExtractor: | ||
|
||
def __init__(self, loop: asyncio.AbstractEventLoop, | ||
update_interval: float = 3600, | ||
browser_path: Optional[Path] = None) -> None: | ||
self.update_interval: float = update_interval | ||
self.browser_path: Optional[Path] = browser_path | ||
self.profile_path = mkdtemp() # cleaned up on exit by nodriver | ||
self._loop = loop | ||
self._token_info: Optional[TokenInfo] = None | ||
self._ongoing_update: asyncio.Lock = asyncio.Lock() | ||
self._extraction_done: asyncio.Event = asyncio.Event() | ||
self._update_requested: asyncio.Event = asyncio.Event() | ||
|
||
def get(self) -> Optional[TokenInfo]: | ||
return self._token_info | ||
|
||
async def run_once(self) -> Optional[TokenInfo]: | ||
await self._update() | ||
return self.get() | ||
|
||
async def run(self) -> None: | ||
await self._update() | ||
while True: | ||
try: | ||
await asyncio.wait_for(self._update_requested.wait(), timeout=self.update_interval) | ||
logger.debug('initiating force update') | ||
except asyncio.TimeoutError: | ||
logger.debug('initiating scheduled update') | ||
await self._update() | ||
self._update_requested.clear() | ||
|
||
def request_update(self) -> bool: | ||
"""Request immediate update, return False if update request is already set""" | ||
if self._ongoing_update.locked(): | ||
logger.debug('update process is already running') | ||
return False | ||
if self._update_requested.is_set(): | ||
logger.debug('force update has already been requested') | ||
return False | ||
self._loop.call_soon_threadsafe(self._update_requested.set) | ||
logger.debug('force update requested') | ||
return True | ||
|
||
@staticmethod | ||
def _extract_token(request: nodriver.cdp.network.Request) -> Optional[TokenInfo]: | ||
post_data = request.post_data | ||
try: | ||
post_data_json = json.loads(post_data) | ||
visitor_data = post_data_json['context']['client']['visitorData'] | ||
potoken = post_data_json['serviceIntegrityDimensions']['poToken'] | ||
except (json.JSONDecodeError, TypeError, KeyError) as e: | ||
logger.warning(f'failed to extract token from request: {type(e)}, {e}') | ||
return None | ||
token_info = TokenInfo( | ||
updated=int(time.time()), | ||
potoken=potoken, | ||
visitor_data=visitor_data | ||
) | ||
return token_info | ||
|
||
async def _update(self) -> None: | ||
try: | ||
await asyncio.wait_for(self._perform_update(), timeout=600) | ||
except asyncio.TimeoutError: | ||
logger.error('update failed: hard limit timeout exceeded. Browser might be failing to start properly') | ||
|
||
async def _perform_update(self) -> None: | ||
if self._ongoing_update.locked(): | ||
logger.debug('update is already in progress') | ||
return | ||
|
||
async with self._ongoing_update: | ||
logger.info('update started') | ||
self._extraction_done.clear() | ||
try: | ||
browser = await nodriver.start(headless=False, | ||
browser_executable_path=self.browser_path, | ||
user_data_dir=self.profile_path) | ||
except FileNotFoundError as e: | ||
msg = "could not find Chromium. Make sure it's installed or provide direct path to the executable" | ||
raise FileNotFoundError(msg) from e | ||
tab = browser.main_tab | ||
tab.add_handler(nodriver.cdp.network.RequestWillBeSent, self._send_handler) | ||
await tab.get('https://www.youtube.com/embed/jNQXAC9IVRw') | ||
player_clicked = await self._click_on_player(tab) | ||
if player_clicked: | ||
await self._wait_for_handler() | ||
await tab.close() | ||
browser.stop() | ||
|
||
@staticmethod | ||
async def _click_on_player(tab: nodriver.Tab) -> bool: | ||
try: | ||
player = await tab.select('#movie_player', 10) | ||
except asyncio.TimeoutError: | ||
logger.warning('update failed: unable to locate video player on the page') | ||
return False | ||
else: | ||
await player.click() | ||
return True | ||
|
||
async def _wait_for_handler(self) -> bool: | ||
try: | ||
await asyncio.wait_for(self._extraction_done.wait(), timeout=30) | ||
except asyncio.TimeoutError: | ||
logger.warning('update failed: timeout waiting for outgoing API request') | ||
return False | ||
else: | ||
logger.info('update was succeessful') | ||
return True | ||
|
||
async def _send_handler(self, event: nodriver.cdp.network.RequestWillBeSent) -> None: | ||
if not event.request.method == 'POST': | ||
return | ||
if '/youtubei/v1/player' not in event.request.url: | ||
return | ||
token_info = self._extract_token(event.request) | ||
if token_info is None: | ||
return | ||
logger.info(f'new token: {token_info.to_json()}') | ||
self._token_info = token_info | ||
self._extraction_done.set() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import argparse | ||
import asyncio | ||
import logging | ||
import sys | ||
from pathlib import Path | ||
from typing import Optional | ||
|
||
import nodriver | ||
|
||
from potoken_generator.extractor import PotokenExtractor, TokenInfo | ||
from potoken_generator.server import PotokenServer | ||
|
||
logger = logging.getLogger('potoken') | ||
|
||
|
||
def print_token_and_exit(token_info: Optional[TokenInfo]): | ||
if token_info is None: | ||
logger.warning('failed to extract token') | ||
sys.exit(1) | ||
visitor_data = token_info.visitor_data | ||
po_token = token_info.potoken | ||
|
||
print('visitor_data: ' + visitor_data) | ||
print('po_token: ' + po_token) | ||
if len(po_token) < 160: | ||
logger.warning("there is a high chance that the potoken generated won't work. Please try again on another internet connection") | ||
sys.exit(1) | ||
sys.exit(0) | ||
|
||
|
||
async def run(loop: asyncio.AbstractEventLoop, oneshot: bool, | ||
update_interval: int, bind_address: str, port: int, | ||
browser_path: Optional[Path] = None) -> None: | ||
potoken_extractor = PotokenExtractor(loop, update_interval=update_interval, browser_path=browser_path) | ||
token = await potoken_extractor.run_once() | ||
if oneshot: | ||
print_token_and_exit(token) | ||
|
||
extractor_task = loop.create_task(potoken_extractor.run()) | ||
potoken_server = PotokenServer(potoken_extractor, port=port, bind_address=bind_address) | ||
server_task = loop.create_task(asyncio.to_thread(potoken_server.run)) | ||
|
||
try: | ||
await asyncio.gather(extractor_task, server_task) | ||
except Exception: | ||
# exceptions raised by the tasks are intentionally propogated | ||
# to ensure process exit code is 1 on error | ||
raise | ||
except (KeyboardInterrupt, asyncio.CancelledError): | ||
logger.info('Stopping...') | ||
finally: | ||
potoken_server.stop() | ||
|
||
|
||
def set_logging(log_level: int = logging.DEBUG) -> None: | ||
log_format = '%(asctime)s.%(msecs)03d [%(name)s] [%(levelname)s] %(message)s' | ||
datefmt = '%Y/%m/%d %H:%M:%S' | ||
logging.basicConfig(level=log_level, format=log_format, datefmt=datefmt) | ||
logging.getLogger('asyncio').setLevel(logging.INFO) | ||
logging.getLogger('nodriver').setLevel(logging.WARNING) | ||
logging.getLogger('uc').setLevel(logging.WARNING) | ||
logging.getLogger('websockets').setLevel(logging.WARNING) | ||
|
||
|
||
def args_parse() -> argparse.Namespace: | ||
description = ''' | ||
Retrieve potoken using Chromium runned by nodriver, serve it on a json endpoint | ||
Token is generated on startup, and then every UPDATE_INTERVAL seconds. | ||
With web-server running on default port, the token is available on the | ||
http://127.0.0.1:8080/token endpoint. It is possible to request immediate | ||
token regeneration by accessing http://127.0.0.1:8080/update | ||
''' | ||
parser = argparse.ArgumentParser(description=description, formatter_class=argparse.RawDescriptionHelpFormatter) | ||
parser.add_argument('-o', '--oneshot', action='store_true', default=False, | ||
help='Do not start server. Generate token once, print it and exit') | ||
parser.add_argument('--update-interval', '-u', type=int, default=3600, | ||
help='How ofthen new token is generated, in seconds (default: %(default)s)') | ||
parser.add_argument('--port', '-p', type=int, default=8080, | ||
help='Port webserver is listening on (default: %(default)s)') | ||
parser.add_argument('--bind', '-b', default='127.0.0.1', | ||
help='Address webserver binds to (default: %(default)s)') | ||
parser.add_argument('--chrome-path', '-c', type=Path, default=None, | ||
help='Path to the Chromiun executable') | ||
return parser.parse_args() | ||
|
||
|
||
def main() -> None: | ||
args = args_parse() | ||
set_logging(logging.WARNING if args.oneshot else logging.INFO) | ||
loop = nodriver.loop() | ||
main_task = run(loop, oneshot=args.oneshot, | ||
update_interval=args.update_interval, | ||
bind_address=args.bind, | ||
port=args.port, | ||
browser_path=args.chrome_path | ||
) | ||
loop.run_until_complete(main_task) |
Oops, something went wrong.