Skip to content

Commit

Permalink
Small improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
Xewdy444 committed Oct 15, 2023
1 parent 8f29a1c commit 9e6815d
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 51 deletions.
44 changes: 25 additions & 19 deletions playwright_recaptcha/recaptchav2/async_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@
RecaptchaRateLimitError,
RecaptchaSolveError,
)
from playwright_recaptcha.recaptchav2.recaptcha_box import AsyncRecaptchaBox

from .recaptcha_box import AsyncRecaptchaBox


class AsyncAudioFile(speech_recognition.AudioFile):
Expand Down Expand Up @@ -163,18 +164,30 @@ async def _response_callback(self, response: Response) -> None:
if token_match is not None:
self._token = token_match.group(1)

async def _random_delay(self, short: bool = False) -> None:
async def _random_delay(self, short: bool = True) -> None:
"""
Delay the browser for a random amount of time.
Parameters
----------
short : bool, optional
Whether to delay for a short amount of time, by default False.
Whether to delay for a short amount of time, by default True.
"""
delay_time = random.randint(150, 350) if short else random.randint(1250, 1500)
await self._page.wait_for_timeout(delay_time)

async def _wait_for_value(self, attribute: str) -> None:
"""
Wait for an attribute to have a value.
Parameters
----------
attribute : str
The attribute.
"""
while getattr(self, attribute) is None:
await self._page.wait_for_timeout(250)

async def _get_capsolver_response(
self, recaptcha_box: AsyncRecaptchaBox, image_data: bytes
) -> Optional[Dict[str, Any]]:
Expand Down Expand Up @@ -255,10 +268,10 @@ async def _solve_tiles(
if "rc-imageselect-dynamic-selected" in await tile.get_attribute("class"):
changing_tiles.append(tile)

await self._random_delay(short=True)
await self._random_delay()

while changing_tiles:
for tile in changing_tiles:
for tile in changing_tiles.copy():
if "rc-imageselect-dynamic-selected" in await tile.get_attribute(
"class"
):
Expand Down Expand Up @@ -342,19 +355,15 @@ async def _click_checkbox(self, recaptcha_box: AsyncRecaptchaBox) -> None:

while recaptcha_box.frames_are_attached():
if await recaptcha_box.challenge_is_solved():
if self.token is None:
if self._token is None:
raise RecaptchaSolveError

break

if await recaptcha_box.rate_limit_is_visible():
raise RecaptchaRateLimitError

if (
await recaptcha_box.audio_challenge_is_visible()
or await recaptcha_box.audio_challenge_button.is_visible()
and await recaptcha_box.audio_challenge_button.is_enabled()
):
if await recaptcha_box.challenge_is_visible():
break

await self._page.wait_for_timeout(250)
Expand Down Expand Up @@ -444,10 +453,8 @@ async def _solve_image_challenge(self, recaptcha_box: AsyncRecaptchaBox) -> None
If the reCAPTCHA rate limit has been exceeded.
"""
while recaptcha_box.frames_are_attached():
while self._payload_response is None:
await self._page.wait_for_timeout(250)

await self._random_delay(short=True)
await self._wait_for_value("_payload_response")
await self._random_delay()

capsolver_response = await self._get_capsolver_response(
recaptcha_box, await self._payload_response.body()
Expand All @@ -465,7 +472,7 @@ async def _solve_image_challenge(self, recaptcha_box: AsyncRecaptchaBox) -> None
recaptcha_box, capsolver_response["solution"]["objects"]
)

await self._random_delay(short=True)
await self._random_delay()

self._payload_response = None
button = recaptcha_box.skip_button.or_(recaptcha_box.next_button)
Expand All @@ -485,8 +492,7 @@ async def _solve_image_challenge(self, recaptcha_box: AsyncRecaptchaBox) -> None
):
await recaptcha_box.new_challenge_button.click()
else:
while self._token is None:
await self._page.wait_for_timeout(250)
await self._wait_for_value("_token")

if await recaptcha_box.rate_limit_is_visible():
raise RecaptchaRateLimitError
Expand All @@ -507,7 +513,7 @@ async def _solve_audio_challenge(self, recaptcha_box: AsyncRecaptchaBox) -> None
RecaptchaRateLimitError
If the reCAPTCHA rate limit has been exceeded.
"""
await self._random_delay()
await self._random_delay(short=False)

while True:
url = await self._get_audio_url(recaptcha_box)
Expand Down
28 changes: 14 additions & 14 deletions playwright_recaptcha/recaptchav2/recaptcha_box.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,14 @@ def _get_recaptcha_frame_pairs(
return frame_pairs

@overload
@staticmethod
def _check_if_attached(
func: Callable[[AsyncRecaptchaBox], Awaitable[bool]]
) -> Callable[[AsyncRecaptchaBox], Awaitable[bool]]:
...

@overload
@staticmethod
def _check_if_attached(
func: Callable[[SyncRecaptchaBox], bool]
) -> Callable[[SyncRecaptchaBox], bool]:
Expand Down Expand Up @@ -378,16 +380,15 @@ def from_frames(cls, frames: Iterable[SyncFrame]) -> SyncRecaptchaBox:
frame_pairs = cls._get_recaptcha_frame_pairs(frames)

for anchor_frame, bframe_frame in frame_pairs:
checkbox = anchor_frame.get_by_role("checkbox", name="I'm not a robot")
recaptcha_box = cls(anchor_frame, bframe_frame)

if (
bframe_frame.get_by_role(
"button", name="Get an audio challenge"
).is_visible()
or checkbox.is_visible()
and not checkbox.is_checked()
recaptcha_box.checkbox.is_visible()
and not recaptcha_box.checkbox.is_checked()
or recaptcha_box.audio_challenge_button.is_visible()
or recaptcha_box.image_challenge_button.is_visible()
):
return cls(anchor_frame, bframe_frame)
return recaptcha_box

raise RecaptchaNotFoundError("No unchecked reCAPTCHA boxes were found.")

Expand Down Expand Up @@ -551,16 +552,15 @@ async def from_frames(cls, frames: Iterable[AsyncFrame]) -> AsyncRecaptchaBox:
frame_pairs = cls._get_recaptcha_frame_pairs(frames)

for anchor_frame, bframe_frame in frame_pairs:
checkbox = anchor_frame.get_by_role("checkbox", name="I'm not a robot")
recaptcha_box = cls(anchor_frame, bframe_frame)

if (
await bframe_frame.get_by_role(
"button", name="Get an audio challenge"
).is_visible()
or await checkbox.is_visible()
and not await checkbox.is_checked()
await recaptcha_box.checkbox.is_visible()
and not await recaptcha_box.checkbox.is_checked()
or await recaptcha_box.audio_challenge_button.is_visible()
or await recaptcha_box.image_challenge_button.is_visible()
):
return cls(anchor_frame, bframe_frame)
return recaptcha_box

raise RecaptchaNotFoundError("No unchecked reCAPTCHA boxes were found.")

Expand Down
42 changes: 24 additions & 18 deletions playwright_recaptcha/recaptchav2/sync_solver.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
RecaptchaRateLimitError,
RecaptchaSolveError,
)
from playwright_recaptcha.recaptchav2.recaptcha_box import SyncRecaptchaBox

from .recaptcha_box import SyncRecaptchaBox


class SyncSolver:
Expand Down Expand Up @@ -127,18 +128,30 @@ def _response_callback(self, response: Response) -> None:
if token_match is not None:
self._token = token_match.group(1)

def _random_delay(self, short: bool = False) -> None:
def _random_delay(self, short: bool = True) -> None:
"""
Delay the browser for a random amount of time.
Parameters
----------
short : bool, optional
Whether to delay for a short amount of time, by default False.
Whether to delay for a short amount of time, by default True.
"""
delay_time = random.randint(150, 350) if short else random.randint(1250, 1500)
self._page.wait_for_timeout(delay_time)

def _wait_for_value(self, attribute: str) -> None:
"""
Wait for an attribute to have a value.
Parameters
----------
attribute : str
The attribute.
"""
while getattr(self, attribute) is None:
self._page.wait_for_timeout(250)

def _get_capsolver_response(
self, recaptcha_box: SyncRecaptchaBox, image_data: bytes
) -> Optional[Dict[str, Any]]:
Expand Down Expand Up @@ -219,10 +232,10 @@ def _solve_tiles(
if "rc-imageselect-dynamic-selected" in tile.get_attribute("class"):
changing_tiles.append(tile)

self._random_delay(short=True)
self._random_delay()

while changing_tiles:
for tile in changing_tiles:
for tile in changing_tiles.copy():
if "rc-imageselect-dynamic-selected" in tile.get_attribute("class"):
continue

Expand Down Expand Up @@ -300,11 +313,7 @@ def _click_checkbox(self, recaptcha_box: SyncRecaptchaBox) -> None:
if recaptcha_box.rate_limit_is_visible():
raise RecaptchaRateLimitError

if (
recaptcha_box.audio_challenge_is_visible()
or recaptcha_box.audio_challenge_button.is_visible()
and recaptcha_box.audio_challenge_button.is_enabled()
):
if recaptcha_box.challenge_is_visible():
break

self._page.wait_for_timeout(250)
Expand Down Expand Up @@ -392,10 +401,8 @@ def _solve_image_challenge(self, recaptcha_box: SyncRecaptchaBox) -> None:
If the reCAPTCHA rate limit has been exceeded.
"""
while recaptcha_box.frames_are_attached():
while self._payload_response is None:
self._page.wait_for_timeout(250)

self._random_delay(short=True)
self._wait_for_value("_payload_response")
self._random_delay()

capsolver_response = self._get_capsolver_response(
recaptcha_box, self._payload_response.body()
Expand All @@ -410,7 +417,7 @@ def _solve_image_challenge(self, recaptcha_box: SyncRecaptchaBox) -> None:
continue

self._solve_tiles(recaptcha_box, capsolver_response["solution"]["objects"])
self._random_delay(short=True)
self._random_delay()

self._payload_response = None
button = recaptcha_box.skip_button.or_(recaptcha_box.next_button)
Expand All @@ -430,8 +437,7 @@ def _solve_image_challenge(self, recaptcha_box: SyncRecaptchaBox) -> None:
):
recaptcha_box.new_challenge_button.click()
else:
while self._token is None:
self._page.wait_for_timeout(250)
self._wait_for_value("_token")

if recaptcha_box.rate_limit_is_visible():
raise RecaptchaRateLimitError
Expand All @@ -452,7 +458,7 @@ def _solve_audio_challenge(self, recaptcha_box: SyncRecaptchaBox) -> None:
RecaptchaRateLimitError
If the reCAPTCHA rate limit has been exceeded.
"""
self._random_delay()
self._random_delay(short=False)

while True:
url = self._get_audio_url(recaptcha_box)
Expand Down

0 comments on commit 9e6815d

Please sign in to comment.