Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RecaptchaBox class implementation #14

Merged
merged 6 commits into from
Feb 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 85 additions & 117 deletions playwright_recaptcha/recaptchav2/async_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,14 @@
import httpx
import pydub
import speech_recognition
from playwright.async_api import Frame, Locator, Page, Response
from playwright.async_api import Page, Response

from playwright_recaptcha.errors import (
RecaptchaNotFoundError,
RecaptchaRateLimitError,
RecaptchaSolveError,
)
from playwright_recaptcha.recaptchav2.utils import (
get_recaptcha_checkbox,
get_recaptcha_frame,
)
from playwright_recaptcha.recaptchav2.recaptcha_box import AsyncRecaptchaBox


class AsyncSolver:
Expand Down Expand Up @@ -71,74 +68,6 @@ async def __aenter__(self) -> AsyncSolver:
async def __aexit__(self, *args: Any) -> None:
self.close()

async def _random_delay(self) -> None:
"""Delay the execution for a random amount of time between 1 and 4 seconds."""
await self._page.wait_for_timeout(random.randint(1000, 4000))

async def _extract_token(self, response: Response) -> None:
"""
Extract the g-recaptcha-response token from the userverify response.

Parameters
----------
response : Response
The response to extract the g-recaptcha-response token from.
"""
if re.search("/recaptcha/(api2|enterprise)/userverify", response.url) is None:
return

token_match = re.search('"uvresp","(.*?)"', await response.text())

if token_match is not None:
self.token = token_match.group(1)

async def _get_audio_url(self, recaptcha_frame: Frame) -> str:
"""
Get the reCAPTCHA audio URL.

Parameters
----------
recaptcha_frame : Frame
The reCAPTCHA frame.

Returns
-------
str
The reCAPTCHA audio URL.

Raises
------
RecaptchaRateLimitError
If the reCAPTCHA rate limit has been exceeded.
"""
audio_challenge_button = recaptcha_frame.get_by_role(
"button", name="Get an audio challenge"
)

if await audio_challenge_button.is_visible():
await audio_challenge_button.click(force=True)

audio_challenge_text = recaptcha_frame.get_by_text("Press PLAY to listen")
rate_limit = recaptcha_frame.get_by_text("Try again later")

while True:
if (
await audio_challenge_text.is_visible()
and await audio_challenge_text.is_enabled()
):
break

if await rate_limit.is_visible():
raise RecaptchaRateLimitError

await self._page.wait_for_timeout(100)

audio_url = recaptcha_frame.get_by_role(
"link", name="Alternatively, download audio as MP3"
)

return await audio_url.get_attribute("href")

@staticmethod
async def _convert_audio_to_text(audio_url: str) -> Optional[str]:
"""
Expand Down Expand Up @@ -187,18 +116,70 @@ async def _convert_audio_to_text(audio_url: str) -> Optional[str]:

return text["alternative"][0]["transcript"] if text else None

async def _random_delay(self) -> None:
"""Delay the execution for a random amount of time between 1 and 4 seconds."""
await self._page.wait_for_timeout(random.randint(1000, 4000))

async def _extract_token(self, response: Response) -> None:
"""
Extract the g-recaptcha-response token from the userverify response.

Parameters
----------
response : Response
The response to extract the g-recaptcha-response token from.
"""
if re.search("/recaptcha/(api2|enterprise)/userverify", response.url) is None:
return

token_match = re.search('"uvresp","(.*?)"', await response.text())

if token_match is not None:
self.token = token_match.group(1)

async def _get_audio_url(self, recaptcha_box: AsyncRecaptchaBox) -> str:
"""
Get the reCAPTCHA audio URL.

Parameters
----------
recaptcha_box : AsyncRecaptchaBox
The reCAPTCHA box.

Returns
-------
str
The reCAPTCHA audio URL.

Raises
------
RecaptchaRateLimitError
If the reCAPTCHA rate limit has been exceeded.
"""
if await recaptcha_box.audio_challenge_button.is_visible():
await recaptcha_box.audio_challenge_button.click(force=True)

while True:
if await recaptcha_box.audio_challenge_is_visible():
break

if await recaptcha_box.rate_limit_is_visible():
raise RecaptchaRateLimitError

await self._page.wait_for_timeout(100)

return await recaptcha_box.audio_download_button.get_attribute("href")

async def _submit_audio_text(
self, recaptcha_frame: Frame, recaptcha_checkbox: Locator, text: str
self, recaptcha_box: AsyncRecaptchaBox, text: str
) -> None:
"""
Submit the reCAPTCHA audio text.

Parameters
----------
recaptcha_frame : Frame
The reCAPTCHA frame.
recaptcha_checkbox : Locator
The reCAPTCHA checkbox.
recaptcha_box : AsyncRecaptchaBox
The reCAPTCHA box.
text : str
The reCAPTCHA audio text.

Expand All @@ -207,26 +188,17 @@ async def _submit_audio_text(
RecaptchaRateLimitError
If the reCAPTCHA rate limit has been exceeded.
"""
textbox = recaptcha_frame.get_by_role("textbox", name="Enter what you hear")
verify_button = recaptcha_frame.get_by_role("button", name="Verify")

await textbox.fill(text)
await verify_button.click()

solve_failure = recaptcha_frame.get_by_text(
"Multiple correct solutions required - please solve more."
)
await recaptcha_box.audio_challenge_textbox.fill(text)
await recaptcha_box.audio_challenge_verify_button.click()

rate_limit = recaptcha_frame.get_by_text("Try again later")

while not recaptcha_frame.is_detached():
while recaptcha_box.frames_are_attached():
if (
await recaptcha_checkbox.is_checked()
or await solve_failure.is_visible()
await recaptcha_box.checkbox.is_checked()
or await recaptcha_box.solve_failure_is_visible()
):
break

if await rate_limit.is_visible():
if await recaptcha_box.rate_limit_is_visible():
raise RecaptchaRateLimitError

await self._page.wait_for_timeout(100)
Expand Down Expand Up @@ -261,63 +233,59 @@ async def solve_recaptcha(self, attempts: Optional[int] = None) -> str:
RecaptchaSolveError
If the reCAPTCHA could not be solved.
"""
self.token = None
self._page.on("response", self._extract_token)
attempts = attempts or self._attempts

await self._page.wait_for_load_state("networkidle")
recaptcha_frame = get_recaptcha_frame(self._page.frames)
recaptcha_checkbox = get_recaptcha_checkbox(self._page.frames)
attempts = attempts or self._attempts
recaptcha_box = await AsyncRecaptchaBox.from_frames(self._page.frames)

if await recaptcha_checkbox.is_hidden():
if await recaptcha_box.checkbox.is_hidden():
raise RecaptchaNotFoundError

await recaptcha_checkbox.click(force=True)
audio_challenge_text = recaptcha_frame.get_by_text("Press PLAY to listen")

audio_challenge_button = recaptcha_frame.get_by_role(
"button", name="Get an audio challenge"
)
await recaptcha_box.checkbox.click(force=True)

while True:
if (
await audio_challenge_text.is_visible()
or await audio_challenge_button.is_visible()
and await audio_challenge_button.is_enabled()
await recaptcha_box.audio_challenge_is_visible()
or await recaptcha_box.audio_challenge_button.is_visible()
and await recaptcha_box.audio_challenge_button.is_enabled()
):
break

if await recaptcha_checkbox.is_checked():
if (
not recaptcha_box.frames_are_attached()
or await recaptcha_box.checkbox.is_checked()
):
if self.token is None:
raise RecaptchaSolveError

return self.token

await self._page.wait_for_timeout(100)

new_challenge_button = recaptcha_frame.get_by_role(
"button", name="Get a new challenge"
)

while attempts > 0:
await self._random_delay()
url = await self._get_audio_url(recaptcha_frame)
url = await self._get_audio_url(recaptcha_box)
text = await self._convert_audio_to_text(url)

if text is None:
await new_challenge_button.click()
await recaptcha_box.new_challenge_button.click()
attempts -= 1
continue

await self._random_delay()
await self._submit_audio_text(recaptcha_frame, recaptcha_checkbox, text)
await self._submit_audio_text(recaptcha_box, text)

if recaptcha_frame.is_detached() or await recaptcha_checkbox.is_checked():
if (
not recaptcha_box.frames_are_attached()
or await recaptcha_box.checkbox.is_checked()
):
if self.token is None:
raise RecaptchaSolveError

return self.token

await new_challenge_button.click()
await recaptcha_box.new_challenge_button.click()
attempts -= 1

raise RecaptchaSolveError
Loading