diff --git a/playwright_recaptcha/recaptchav2/async_solver.py b/playwright_recaptcha/recaptchav2/async_solver.py index d3fb65a..ad086ff 100644 --- a/playwright_recaptcha/recaptchav2/async_solver.py +++ b/playwright_recaptcha/recaptchav2/async_solver.py @@ -27,7 +27,8 @@ RecaptchaRateLimitError, RecaptchaSolveError, ) -from playwright_recaptcha.recaptchav2.recaptcha_box import AsyncRecaptchaBox + +from .recaptcha_box import AsyncRecaptchaBox class AsyncAudioFile(speech_recognition.AudioFile): @@ -163,18 +164,30 @@ async def _response_callback(self, response: Response) -> None: if token_match is not None: self._token = token_match.group(1) - async def _random_delay(self, short: bool = False) -> None: + async def _random_delay(self, short: bool = True) -> None: """ Delay the browser for a random amount of time. Parameters ---------- short : bool, optional - Whether to delay for a short amount of time, by default False. + Whether to delay for a short amount of time, by default True. """ delay_time = random.randint(150, 350) if short else random.randint(1250, 1500) await self._page.wait_for_timeout(delay_time) + async def _wait_for_value(self, attribute: str) -> None: + """ + Wait for an attribute to have a value. + + Parameters + ---------- + attribute : str + The attribute. + """ + while getattr(self, attribute) is None: + await self._page.wait_for_timeout(250) + async def _get_capsolver_response( self, recaptcha_box: AsyncRecaptchaBox, image_data: bytes ) -> Optional[Dict[str, Any]]: @@ -255,10 +268,10 @@ async def _solve_tiles( if "rc-imageselect-dynamic-selected" in await tile.get_attribute("class"): changing_tiles.append(tile) - await self._random_delay(short=True) + await self._random_delay() while changing_tiles: - for tile in changing_tiles: + for tile in changing_tiles.copy(): if "rc-imageselect-dynamic-selected" in await tile.get_attribute( "class" ): @@ -342,7 +355,7 @@ async def _click_checkbox(self, recaptcha_box: AsyncRecaptchaBox) -> None: while recaptcha_box.frames_are_attached(): if await recaptcha_box.challenge_is_solved(): - if self.token is None: + if self._token is None: raise RecaptchaSolveError break @@ -350,11 +363,7 @@ async def _click_checkbox(self, recaptcha_box: AsyncRecaptchaBox) -> None: if await recaptcha_box.rate_limit_is_visible(): raise RecaptchaRateLimitError - if ( - await recaptcha_box.audio_challenge_is_visible() - or await recaptcha_box.audio_challenge_button.is_visible() - and await recaptcha_box.audio_challenge_button.is_enabled() - ): + if await recaptcha_box.challenge_is_visible(): break await self._page.wait_for_timeout(250) @@ -444,10 +453,8 @@ async def _solve_image_challenge(self, recaptcha_box: AsyncRecaptchaBox) -> None If the reCAPTCHA rate limit has been exceeded. """ while recaptcha_box.frames_are_attached(): - while self._payload_response is None: - await self._page.wait_for_timeout(250) - - await self._random_delay(short=True) + await self._wait_for_value("_payload_response") + await self._random_delay() capsolver_response = await self._get_capsolver_response( recaptcha_box, await self._payload_response.body() @@ -465,7 +472,7 @@ async def _solve_image_challenge(self, recaptcha_box: AsyncRecaptchaBox) -> None recaptcha_box, capsolver_response["solution"]["objects"] ) - await self._random_delay(short=True) + await self._random_delay() self._payload_response = None button = recaptcha_box.skip_button.or_(recaptcha_box.next_button) @@ -485,8 +492,7 @@ async def _solve_image_challenge(self, recaptcha_box: AsyncRecaptchaBox) -> None ): await recaptcha_box.new_challenge_button.click() else: - while self._token is None: - await self._page.wait_for_timeout(250) + await self._wait_for_value("_token") if await recaptcha_box.rate_limit_is_visible(): raise RecaptchaRateLimitError @@ -507,7 +513,7 @@ async def _solve_audio_challenge(self, recaptcha_box: AsyncRecaptchaBox) -> None RecaptchaRateLimitError If the reCAPTCHA rate limit has been exceeded. """ - await self._random_delay() + await self._random_delay(short=False) while True: url = await self._get_audio_url(recaptcha_box) diff --git a/playwright_recaptcha/recaptchav2/recaptcha_box.py b/playwright_recaptcha/recaptchav2/recaptcha_box.py index c9d32e7..3ce0c32 100644 --- a/playwright_recaptcha/recaptchav2/recaptcha_box.py +++ b/playwright_recaptcha/recaptchav2/recaptcha_box.py @@ -69,12 +69,14 @@ def _get_recaptcha_frame_pairs( return frame_pairs @overload + @staticmethod def _check_if_attached( func: Callable[[AsyncRecaptchaBox], Awaitable[bool]] ) -> Callable[[AsyncRecaptchaBox], Awaitable[bool]]: ... @overload + @staticmethod def _check_if_attached( func: Callable[[SyncRecaptchaBox], bool] ) -> Callable[[SyncRecaptchaBox], bool]: @@ -378,16 +380,15 @@ def from_frames(cls, frames: Iterable[SyncFrame]) -> SyncRecaptchaBox: frame_pairs = cls._get_recaptcha_frame_pairs(frames) for anchor_frame, bframe_frame in frame_pairs: - checkbox = anchor_frame.get_by_role("checkbox", name="I'm not a robot") + recaptcha_box = cls(anchor_frame, bframe_frame) if ( - bframe_frame.get_by_role( - "button", name="Get an audio challenge" - ).is_visible() - or checkbox.is_visible() - and not checkbox.is_checked() + recaptcha_box.checkbox.is_visible() + and not recaptcha_box.checkbox.is_checked() + or recaptcha_box.audio_challenge_button.is_visible() + or recaptcha_box.image_challenge_button.is_visible() ): - return cls(anchor_frame, bframe_frame) + return recaptcha_box raise RecaptchaNotFoundError("No unchecked reCAPTCHA boxes were found.") @@ -551,16 +552,15 @@ async def from_frames(cls, frames: Iterable[AsyncFrame]) -> AsyncRecaptchaBox: frame_pairs = cls._get_recaptcha_frame_pairs(frames) for anchor_frame, bframe_frame in frame_pairs: - checkbox = anchor_frame.get_by_role("checkbox", name="I'm not a robot") + recaptcha_box = cls(anchor_frame, bframe_frame) if ( - await bframe_frame.get_by_role( - "button", name="Get an audio challenge" - ).is_visible() - or await checkbox.is_visible() - and not await checkbox.is_checked() + await recaptcha_box.checkbox.is_visible() + and not await recaptcha_box.checkbox.is_checked() + or await recaptcha_box.audio_challenge_button.is_visible() + or await recaptcha_box.image_challenge_button.is_visible() ): - return cls(anchor_frame, bframe_frame) + return recaptcha_box raise RecaptchaNotFoundError("No unchecked reCAPTCHA boxes were found.") diff --git a/playwright_recaptcha/recaptchav2/sync_solver.py b/playwright_recaptcha/recaptchav2/sync_solver.py index 32814e6..72e22e0 100644 --- a/playwright_recaptcha/recaptchav2/sync_solver.py +++ b/playwright_recaptcha/recaptchav2/sync_solver.py @@ -19,7 +19,8 @@ RecaptchaRateLimitError, RecaptchaSolveError, ) -from playwright_recaptcha.recaptchav2.recaptcha_box import SyncRecaptchaBox + +from .recaptcha_box import SyncRecaptchaBox class SyncSolver: @@ -127,18 +128,30 @@ def _response_callback(self, response: Response) -> None: if token_match is not None: self._token = token_match.group(1) - def _random_delay(self, short: bool = False) -> None: + def _random_delay(self, short: bool = True) -> None: """ Delay the browser for a random amount of time. Parameters ---------- short : bool, optional - Whether to delay for a short amount of time, by default False. + Whether to delay for a short amount of time, by default True. """ delay_time = random.randint(150, 350) if short else random.randint(1250, 1500) self._page.wait_for_timeout(delay_time) + def _wait_for_value(self, attribute: str) -> None: + """ + Wait for an attribute to have a value. + + Parameters + ---------- + attribute : str + The attribute. + """ + while getattr(self, attribute) is None: + self._page.wait_for_timeout(250) + def _get_capsolver_response( self, recaptcha_box: SyncRecaptchaBox, image_data: bytes ) -> Optional[Dict[str, Any]]: @@ -219,10 +232,10 @@ def _solve_tiles( if "rc-imageselect-dynamic-selected" in tile.get_attribute("class"): changing_tiles.append(tile) - self._random_delay(short=True) + self._random_delay() while changing_tiles: - for tile in changing_tiles: + for tile in changing_tiles.copy(): if "rc-imageselect-dynamic-selected" in tile.get_attribute("class"): continue @@ -300,11 +313,7 @@ def _click_checkbox(self, recaptcha_box: SyncRecaptchaBox) -> None: if recaptcha_box.rate_limit_is_visible(): raise RecaptchaRateLimitError - if ( - recaptcha_box.audio_challenge_is_visible() - or recaptcha_box.audio_challenge_button.is_visible() - and recaptcha_box.audio_challenge_button.is_enabled() - ): + if recaptcha_box.challenge_is_visible(): break self._page.wait_for_timeout(250) @@ -392,10 +401,8 @@ def _solve_image_challenge(self, recaptcha_box: SyncRecaptchaBox) -> None: If the reCAPTCHA rate limit has been exceeded. """ while recaptcha_box.frames_are_attached(): - while self._payload_response is None: - self._page.wait_for_timeout(250) - - self._random_delay(short=True) + self._wait_for_value("_payload_response") + self._random_delay() capsolver_response = self._get_capsolver_response( recaptcha_box, self._payload_response.body() @@ -410,7 +417,7 @@ def _solve_image_challenge(self, recaptcha_box: SyncRecaptchaBox) -> None: continue self._solve_tiles(recaptcha_box, capsolver_response["solution"]["objects"]) - self._random_delay(short=True) + self._random_delay() self._payload_response = None button = recaptcha_box.skip_button.or_(recaptcha_box.next_button) @@ -430,8 +437,7 @@ def _solve_image_challenge(self, recaptcha_box: SyncRecaptchaBox) -> None: ): recaptcha_box.new_challenge_button.click() else: - while self._token is None: - self._page.wait_for_timeout(250) + self._wait_for_value("_token") if recaptcha_box.rate_limit_is_visible(): raise RecaptchaRateLimitError @@ -452,7 +458,7 @@ def _solve_audio_challenge(self, recaptcha_box: SyncRecaptchaBox) -> None: RecaptchaRateLimitError If the reCAPTCHA rate limit has been exceeded. """ - self._random_delay() + self._random_delay(short=False) while True: url = self._get_audio_url(recaptcha_box)