Skip to content

Commit

Permalink
Fix broken implict flow auth (#134)
Browse files Browse the repository at this point in the history
* Fix expired auth in vk.UserAPI

* Add 2fa handler to vk.UserAPI

* Writing and refactoring tests

* Fixing some test errors

* Mixing with vk.API, not vk.UserAPI

* Swtich to Kate mobile app_id, fix error with vk.UserAPI interactivemixin

* Processing 401 error

* Update docs, increase coverage, fix typos

* Fix CI (captcha error)

* Add env vars to CI

* Add vars to contribution.rst, fix tox.ini

* Fix tox.ini

* test

* Write captcha handler for vk.UserAPI

* Update get_captcha_key, documentation

* Check if oauth request successed

* Change client_id

* Skip tests of vk.UserAPI on CI

* Recall auth check code if it's invalid

* More logging!
  • Loading branch information
YariKartoshe4ka authored Jun 20, 2022
1 parent 5b47271 commit fbf74a5
Show file tree
Hide file tree
Showing 11 changed files with 324 additions and 166 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ jobs:
- name: Run test suite
run: tox --skip-pkg-install
env:
CI_RUN: 1
VK_ACCESS_TOKEN: ${{ secrets.VK_ACCESS_TOKEN }}
VK_USER_LOGIN: ${{ secrets.VK_USER_LOGIN }}
VK_USER_PASSWORD: ${{ secrets.VK_USER_PASSWORD }}
- name: Upload coverage
uses: codecov/codecov-action@v2

Expand Down
9 changes: 9 additions & 0 deletions docs/contribution.rst
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ Since some test suites use real calls to the VK API, you should create the neces
- Access token for VK API. We reccomend to use community token, because it doesn't have an expiration date
- Create your community, go to its settings (API section), create API key with messages scopes

* - VK_USER_LOGIN
- Login from the VK account. For rare tests, you can use your account, otherwise we recommend to register a test one
- \-

* - VK_USER_PASSWORD
- Password from the VK account. For rare tests, you can use your account, otherwise we recommend to register a test one
- \-



Logging
-------
Expand Down
1 change: 1 addition & 0 deletions docs/usage.rst
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ vk.UserAPI
----------

.. autoclass:: vk.session.UserAPI
:members:


vk.CommunityAPI
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[build-system]
requires = ["setuptools"]
requires = ["setuptools>=60"]
build-backend = "setuptools.build_meta"

[project]
Expand Down
205 changes: 141 additions & 64 deletions src/vk/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import requests

from .api import APINamespace
from .exceptions import VkAPIError, VkAuthError
from .exceptions import ErrorCodes, VkAPIError, VkAuthError
from .utils import stringify

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -124,10 +124,17 @@ def __init__(self, access_token=None, **kwargs):
super().__init__(**kwargs)
self.access_token = access_token

def get_captcha_key(self, request):
"""Default behavior on CAPTCHA is to raise exception. Redefine in a subclass
def get_captcha_key(self, api_error):
"""Callback to retrieve CAPTCHA key. Default behavior is to raise exception,
redefine in a subclass
Args:
api_error (vk.exceptions.VkAPIError): Captcha error that occurred
Returns:
Captcha solution (a short string consisting of lowercase letters and numbers)
"""
raise request.api_error
raise api_error

def on_api_error_14(self, request):
"""Captcha error handler. Retrieves captcha via :meth:`API.get_captcha_key` and
Expand All @@ -144,12 +151,14 @@ def prepare_request(self, request):

class UserAPI(API):
"""Subclass of :class:`vk.session.API`. It differs only in that it can get access token
using app id and user credentials (Implicit flow authorization).
using user credentials (`Implicit flow authorization
<https://dev.vk.com/api/access-token/implicit-flow-user>`__).
Args:
user_login (Optional[str]): User login, optional when using :class:`InteractiveMixin`
user_password (Optional[str]): User password, optional when using :class:`InteractiveMixin`
app_id (Optional[int]): App ID
client_id (Optional[int]): ID of the application to authorize with, defaults to
"VK Admin" app ID
scope (Optional[Union[str, int]]): Access rights you need. Can be passed
comma-separated list of scopes, or bitmask sum all of them (see `official
documentation <https://dev.vk.com/reference/access-rights>`__). Defaults
Expand All @@ -165,124 +174,187 @@ class UserAPI(API):
>>> api = vk.UserAPI(
... user_login='...',
... user_password='...',
... app_id=123456,
... scope='offline,wall',
... v='5.131'
... )
>>> print(api.users.get(user_ids=1))
[{'id': 1, 'first_name': 'Павел', 'last_name': 'Дуров', ... }]
"""
LOGIN_URL = 'https://m.vk.com'
LOGIN_URL = 'https://oauth.vk.com'
AUTHORIZE_URL = 'https://oauth.vk.com/authorize'

def __init__(self, user_login=None, user_password=None, app_id=None, scope='offline', **kwargs):
def __init__(self, user_login=None, user_password=None, client_id=6121396, scope='offline', **kwargs):
self.user_login = user_login
self.user_password = user_password
self.app_id = app_id
self.client_id = client_id
self.scope = scope

super().__init__(self.get_access_token(), **kwargs)

@staticmethod
def get_form_action(response):
def _get_form_action(response):
form_action = findall(r'<form(?= ).* action="(.+)"', response.text)
if form_action:
return form_action[0]
else:
raise VkAuthError('No form on page {}'.format(response.url))
raise VkAuthError(f'No form on page {response.url}')

def get_response_url_queries(self, response):
if not response.ok:
if response.status_code == 401:
raise VkAuthError(response.json()['error_description'])
else:
response.raise_for_status()

return self.get_url_queries(response.url)
@staticmethod
def _get_input_value(response, name):
input_value = findall(rf'<input.* type="hidden".* name="{name}".* value="(.+)"', response.text)
if input_value:
return input_value[0]
raise VkAuthError(f'No input with name `{name}` on page {response.url}')

@staticmethod
def get_url_queries(url):
def _get_url_queries(url):
parsed_url = urllib.parse.urlparse(url)
url_queries = urllib.parse.parse_qsl(parsed_url.fragment)
# We lose repeating keys values
return dict(url_queries)
return dict(urllib.parse.parse_qsl(parsed_url.fragment or parsed_url.query))

@staticmethod
def _oauth_is_request_success(response):
if not response.ok:
if response.status_code == 401:
description = response.json()['error_description']
logger.error('OAuth authorization failed: %s', description)
raise VkAuthError(description)
response.raise_for_status()

def get_access_token(self):
auth_session = requests.Session()
auth_session.headers['Origin'] = 'https://oauth.vk.com'

if self.login(auth_session):
return self.authorize(auth_session)

def get_login_form_data(self):
def get_login_form_data(self, response):
return {
'ip_h': self._get_input_value(response, 'ip_h'),
'lg_domain_h': self._get_input_value(response, 'lg_domain_h'),
'to': self._get_input_value(response, 'to'),
'email': self.user_login,
'pass': self.user_password,
}

def login(self, auth_session):
# Get login page
login_page_response = auth_session.get(self.LOGIN_URL)
# Get login form action. It must contains ip_h and lg_h values
login_action = self.get_form_action(login_page_response)
# Login using user credentials
login_response = auth_session.post(login_action, self.get_login_form_data())
def login(self, auth_session, login_response=None):
if not login_response:
logger.debug('Start of the login process')
# Get login page
login_page_response = auth_session.get(self.AUTHORIZE_URL, params=self.get_auth_params())
# Check if params for OAuth is enough
self._oauth_is_request_success(login_page_response)
# Get login form action
login_action = self._get_form_action(login_page_response)
# Login using user credentials
login_response = auth_session.post(login_action, self.get_login_form_data(login_page_response))

if 'remixsid' in auth_session.cookies or 'remixsid6' in auth_session.cookies:
logger.debug('Successfully logged in')
return True

url_queries = self.get_url_queries(login_response.url)
url_queries = self._get_url_queries(login_response.url)

if 'sid' in url_queries:
self.auth_captcha_is_needed(login_response)
logger.debug('Auth captcha is needed')
return self.auth_captcha_is_needed(auth_session, login_response)

if url_queries.get('act') == 'authcheck':
logger.debug('Auth check code is needed')
return self.auth_check_is_needed(auth_session, login_response)

elif url_queries.get('act') == 'authcheck':
self.auth_check_is_needed(login_response.text)
if 'security_check' in url_queries:
logger.debug('Phone number is needed')
return self.phone_number_is_needed(auth_session, login_response)

elif 'security_check' in url_queries:
self.phone_number_is_needed(login_response.text)
logger.error('Unknown login error. Last URL: %s.', login_response.url)
raise VkAuthError('Login error (e.g. incorrect password)')

def auth_captcha_is_needed(self, auth_session, response):
# Get login form action
login_action = self._get_form_action(response)

# Get captcha data (img and sid)
captcha_img = findall(r'<img.* id="captcha".* src="([^\"]+)"', response.text)
if captcha_img:
captcha_img = captcha_img[0]
else:
raise VkAuthError('Login error (e.g. incorrect password)')
raise VkAuthError(f'No captcha on page {response.url}')

captcha_sid = self._get_input_value(response, 'captcha_sid')

# Create a bogus error
error_data = {
'error_code': ErrorCodes.CAPTCHA_NEEDED,
'error_msg': 'Captcha error occured during authorization',
'captcha_sid': captcha_sid,
'captcha_img': captcha_img
}
error = VkAPIError(error_data)

# Login again using user credentials and solved captcha
login_form_data = {
**self.get_login_form_data(response),
'captcha_sid': captcha_sid,
'captcha_key': self.get_captcha_key(error)
}
login_response = auth_session.post(login_action, login_form_data)

# Re-login with solved captcha
return self.login(auth_session, login_response)

def get_auth_check_code(self):
"""Callback to retrieve authentication check code (if account supports 2FA). Default
behavior is to raise exception, redefine in a subclass
Returns:
The authentication check code can be obtained in the sent SMS, using Google
Authenticator (or another authenticator), or it can be one of ten backup codes
"""
raise NotImplementedError

def auth_check_is_needed(self, auth_session, response):
auth_check_action = self.LOGIN_URL + self._get_form_action(response)
login_response = auth_session.post(auth_check_action, {'code': self.get_auth_check_code()})

# Re-login with auth check code
return self.login(auth_session, login_response)

def phone_number_is_needed(self, auth_session, response): # noqa: U100
raise NotImplementedError

def get_auth_params(self):
return {
'client_id': self.app_id,
'client_id': self.client_id,
'scope': self.scope,
'display': 'mobile',
'response_type': 'token',
}

def authorize(self, auth_session):
"""
OAuth2
"""
logger.debug('Start of the OAuth authorization process')
# Ask access
ask_access_response = auth_session.post(self.AUTHORIZE_URL, self.get_auth_params())
url_queries = self.get_response_url_queries(ask_access_response)
self._oauth_is_request_success(ask_access_response)
url_queries = self._get_url_queries(ask_access_response.url)

if 'access_token' not in url_queries:
if 'authorize_url' not in url_queries:
logger.debug('Grant access to app')
# Grant access
grant_access_action = self.get_form_action(ask_access_response)
grant_access_action = self._get_form_action(ask_access_response)
grant_access_response = auth_session.post(grant_access_action)
url_queries = self.get_response_url_queries(grant_access_response)
url_queries = self._get_url_queries(grant_access_response.url)

return self.process_auth_url_queries(url_queries)
url_queries = self._get_url_queries(urllib.parse.unquote(url_queries['authorize_url']))

def process_auth_url_queries(self, url_queries):
self.expires_in = url_queries.get('expires_in')
self.user_id = url_queries.get('user_id')
return url_queries.get('access_token')

def on_api_error_15(self, request):
"""
15. Access denied
- due to scope
"""
logger.error('Authorization failed. Access token will be dropped')
if 'access_token' in url_queries:
logger.debug('Successfully authorized')
return url_queries['access_token']

del request.method_params['access_token']
self.access_token = self.get_access_token()

return self.send(request)
logger.error('Unknown OAuth authorization error. URL queries = %s.', url_queries)
raise VkAuthError('OAuth authorization failed')


class CommunityAPI(UserAPI):
Expand Down Expand Up @@ -336,10 +408,15 @@ class API(InteractiveMixin, vk.API):
"""

def __setattr__(self, name, value):
if name in dir(self.__class__) and not value:
attrs = dir(self.__class__)

if name in attrs and not value:
return

object.__setattr__(self, name, value)
if name in filter(lambda x: isinstance(getattr(self.__class__, x), property), attrs):
return object.__setattr__(self, '_cached_' + name, value)

return object.__setattr__(self, name, value)

@property
def user_login(self):
Expand All @@ -359,11 +436,11 @@ def access_token(self):
self._cached_access_token = input('VK API access token: ')
return self._cached_access_token

def get_captcha_key(self, captcha_image_url):
def get_captcha_key(self, api_error):
"""
Read CAPTCHA key from shell
"""
print('Open CAPTCHA image url: ', captcha_image_url)
print('Open CAPTCHA image url: ', api_error.captcha_img)
return input('Enter CAPTCHA key: ')

def get_auth_check_code(self):
Expand Down
Loading

0 comments on commit fbf74a5

Please sign in to comment.