Skip to content

Commit

Permalink
AES for encryping PoorSession
Browse files Browse the repository at this point in the history
  • Loading branch information
ondratu committed Jun 29, 2024
1 parent c245ccc commit 5e457cd
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 126 deletions.
20 changes: 15 additions & 5 deletions doc/documentation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ CachedInput
~~~~~~~~~~~

When HTTP Forms are base64 encoded, FieldStorage use readline on request input
file. This is not so optimal. So there is CachedInput class, which is returned
file. This is not so optimal. So there is CachedInput class, which is returned

Proccess variables
~~~~~~~~~~~~~~~~~~
Expand Down Expand Up @@ -996,12 +996,17 @@ more times.
Sessions
~~~~~~~~
Like in mod_python, PoorSession is session class of PoorWSGI. It's
self-contained cookie which has data dictionary. Data are sent to client in
hidden, bzip2, base64 encoded format. PoorSession needs ``secret_key``,
PoorWSGI has PoorSession is session class for self-contained cookie. Data
are sent to client encrypted with AES CTR method, and signed just like JWT.
Don't forget, that poorwsgi.session module needs ``pyaes`` module from
https://pypi.org/project/pyaes/. PoorSession needs ``secret_key``,
which can be set by ``poor_SecretKey`` environment variable to
Application.secret_key property.

*Be aware that sending password to cookie is anti-pattern, so avoid to that
even if next example do it. For many situation, best practice is using JWT or
some session ID, and data should be stored in cache database.*

.. code:: python
from functools import wraps
Expand All @@ -1020,7 +1025,7 @@ Application.secret_key property.
@wraps(fn) # using wraps make right/better /debug-info page
def handler(req):
cookie = PoorSession(app.secret_key)
cookie.load()
cookie.load(req.cookies)
if "passwd" not in cookie.data: # expires or didn't set
log.info("Login cookie not found.")
redirect("/login", message=b"Login required")
Expand Down Expand Up @@ -1058,6 +1063,11 @@ Application.secret_key property.
cookie.header(response)
return response
JSON Web Tokens
```````````````



HTTP Digest Auth
~~~~~~~~~~~~~~~~

Expand Down
170 changes: 69 additions & 101 deletions poorwsgi/session.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
"""PoorSession self-contained cookie class.
:Classes: NoCompress, PoorSession
:Functions: hidden, get_token, check_token
:Classes: PoorSession
:Functions: get_token, check_token
This module is depended to pyaes https://pypi.org/project/pyaes/
"""
from hashlib import sha512, sha256
import hmac
from base64 import urlsafe_b64decode, urlsafe_b64encode
from hashlib import sha256, sha3_256
from http.cookies import SimpleCookie
from json import dumps, loads
from base64 import b64decode, b64encode
from logging import getLogger
from time import time
from typing import Union, Dict, Any, Optional
from typing import Any, Dict, Optional, Union

import bz2

from http.cookies import SimpleCookie
from pyaes import AESModeOfOperationCTR # type: ignore

from poorwsgi.headers import Headers
from poorwsgi.request import Request
from poorwsgi.response import Response

log = getLogger("poorwsgi") # pylint: disable=invalid-name
Expand All @@ -26,38 +27,9 @@
# pylint: disable=consider-using-f-string


def hidden(text: Union[str, bytes], passwd: Union[str, bytes]) -> bytes:
"""(en|de)crypt text with sha hash of passwd via xor.
Arguments:
text : str or bytes
raw data to (en|de)crypt
passwd : str or bytes
password
"""
if isinstance(passwd, bytes):
passwd = sha512(passwd).digest()
else:
passwd = sha512(passwd.encode("utf-8")).digest()
passlen = len(passwd)

# text must be bytes
if isinstance(text, str):
text = text.encode("utf-8")

if isinstance(text, str): # if text is str
retval = ''
for i, val in enumerate(text):
retval += chr(ord(val) ^ ord(passwd[i % passlen]))
else: # if text is bytes
retval = bytearray()
for i, val in enumerate(text):
retval.append(val ^ passwd[i % passlen])

return retval


def get_token(secret: str, client: str, timeout: Optional[int] = None,
def get_token(secret: str,
client: str,
timeout: Optional[int] = None,
expired: int = 0):
"""Create token from secret, and client string.
Expand All @@ -68,14 +40,16 @@ def get_token(secret: str, client: str, timeout: Optional[int] = None,
text = "%s%s" % (secret, client)
else:
if expired == 0:
now = int(time() / timeout) * timeout # shift to start time
now = int(time() / timeout) * timeout # shift to start time
expired = now + 2 * timeout
text = "%s%s%s" % (secret, expired, client)

return sha256(text.encode()).hexdigest()


def check_token(token: str, secret: str, client: str,
def check_token(token: str,
secret: str,
client: str,
timeout: Optional[int] = None):
"""Check token, if it is right.
Expand All @@ -100,31 +74,13 @@ class SessionError(RuntimeError):
"""Base Exception for Session"""


class NoCompress:
"""Fake compress class/module whith two static method for PoorSession.
If compress parameter is None, this class is use.
"""

@staticmethod
def compress(data, compresslevel=0): # pylint: disable=unused-argument
"""Get two params, data, and compresslevel. Method only return data."""
return data

@staticmethod
def decompress(data):
"""Get one parameter data, which returns."""
return data


class PoorSession:
"""Self-contained cookie with session data.
You cat store or read data from object via PoorSession.data variable which
must be dictionary. Data is stored to cookie by pickle dump, and next
hidden with app.secret_key. So it must be set on Application object or with
poor_SecretKey environment variable. Be careful with stored object. You can
add object with little python trick:
must be dictionary. Data is stored to cookie by json dump, and next encrypt
with by AES CTR method with `secret_key`. Session data are signed just like
JWT.
.. code:: python
Expand Down Expand Up @@ -160,10 +116,15 @@ def export(self):
obj.import(sess.data['dict'])
"""

def __init__(self, secret_key: Union[Request, str, bytes],
expires: int = 0, max_age: Optional[int] = None,
domain: str = '', path: str = '/', secure: bool = False,
same_site: bool = False, compress=bz2, sid: str = 'SESSID'):
def __init__(self,
secret_key: Union[str, bytes],
expires: int = 0,
max_age: Optional[int] = None,
domain: str = '',
path: str = '/',
secure: bool = False,
same_site: bool = False,
sid: str = 'SESSID'):
"""Constructor.
Arguments:
Expand All @@ -182,10 +143,6 @@ def __init__(self, secret_key: Union[Request, str, bytes],
The ``SameSite`` attribute. When is set could be one of
``Strict|Lax|None``. By default attribute is not set which is
``Lax`` by browser.
compress : compress module or class.
Could be ``bz2``, ``gzip.zlib``, or any other, which have
standard compress and decompress methods. Or it could be
``None`` to not use any compressing method.
sid : str
Cookie key name.
Expand All @@ -198,7 +155,6 @@ def __init__(self, secret_key: Union[Request, str, bytes],
'path': '/application',
̈́'secure': True,
'same_site': True,
'compress': gzip,
'sid': 'MYSID'
}
Expand All @@ -210,60 +166,72 @@ def __init__(self, secret_key: Union[Request, str, bytes],
*Changed in version 2.4.x*: use app.secret_key in constructor, and than
call load method.
*Changed in version 2.7.0*:
* Using AES encryption with signature just like in JWT.
* Removing compression
* Use secret key have to be string or bytes.
"""
if not isinstance(secret_key, (str, bytes)): # backwards compatibility
log.warning('Do not use request in PoorSession constructor, '
'see new api and call load method manually.')
if secret_key.secret_key is None:
raise SessionError("poor_SecretKey is not set!")
self.__secret_key = secret_key.secret_key
else:
self.__secret_key = secret_key
if not secret_key:
raise SessionError("Empty secret_key")
if isinstance(secret_key, str):
secret_key = secret_key.encode('utf-8')

self.__secret_key = sha3_256(secret_key).digest()
self.__sid = sid
self.__expires = expires
self.__max_age = max_age
self.__domain = domain
self.__path = path
self.__secure = secure
self.__same_site = same_site
self.__cps = compress if compress is not None else NoCompress

# data is session dictionary to store user data in cookie
self.data: Dict[Any, Any] = {}
self.cookie: SimpleCookie = SimpleCookie()
self.cookie[sid] = ''

if not isinstance(secret_key, (str, bytes)): # backwards compatibility
self.load(secret_key.cookies)

def load(self, cookies: Optional[SimpleCookie]):
"""Load session from request's cookie"""
if not isinstance(cookies, SimpleCookie) or self.__sid not in cookies:
return
raw = cookies[self.__sid].value

if raw:
try:
self.data = loads(hidden(self.__cps.decompress
(b64decode(raw.encode())),
self.__secret_key))
except Exception as err:
log.info(repr(err))
raise SessionError("Bad session data.") from err
if not raw:
return

try:
# payload, signature = map(urlsafe_b64decode,
# raw.encode('utf-8').split(b'.'))
payload, signature = raw.encode('utf-8').split(b'.')
payload = urlsafe_b64decode(payload)
signature = urlsafe_b64decode(signature)

digest = hmac.digest(self.__secret_key, payload, digest=sha256)
if not hmac.compare_digest(digest, signature):
raise RuntimeError("Invalid Signature")

if not isinstance(self.data, dict):
raise SessionError("Cookie data is not dictionary!")
aes = AESModeOfOperationCTR(self.__secret_key)
self.data = loads(aes.decrypt(payload).decode('utf-8'))

except Exception as err:
log.info(repr(err))
raise SessionError("Bad session data.") from err

if not isinstance(self.data, dict):
raise SessionError("Cookie data is not dictionary!")

def write(self):
"""Store data to cookie value.
This method is called automatically in header method.
"""
raw = b64encode(self.__cps.compress(hidden(dumps(self.data),
self.__secret_key), 9))
raw = raw if isinstance(raw, str) else raw.decode()
self.cookie[self.__sid] = raw
aes = AESModeOfOperationCTR(self.__secret_key)
payload = aes.encrypt(dumps(self.data))
digest = hmac.digest(self.__secret_key, payload, digest=sha256)
raw = urlsafe_b64encode(payload) + b'.' + urlsafe_b64encode(digest)

self.cookie[self.__sid] = raw.decode('utf-8')
self.cookie[self.__sid]['HttpOnly'] = True

if self.__domain:
Expand Down Expand Up @@ -306,8 +274,8 @@ def header(self, headers: Optional[Union[Headers, Response]] = None):
cookies = self.cookie.output().split('\r\n')
retval = []
for cookie in cookies:
var = cookie[:10] # Set-Cookie
val = cookie[12:] # SID=###; expires=###; Path=/
var = cookie[:10] # Set-Cookie
val = cookie[12:] # SID=###; expires=###; Path=/
retval.append((var, val))
if headers:
headers.add_header(var, val)
Expand Down
3 changes: 2 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,5 +225,6 @@ def doc():
'test': PyTest},
tests_require=['pytest', 'requests', 'openapi-core', 'simplejson'],
extras_require={
'JSONGeneratorResponse': ['simplejson']}
'JSONGeneratorResponse': ['simplejson'],
'Session': ['pyaes']}
)
23 changes: 4 additions & 19 deletions tests/test_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ class Request:
cookies: Any = SimpleCookie()


class Empty:
"""Request mock without secret key."""
secret_key = None


@fixture
def req():
"""Instance of Request object."""
Expand Down Expand Up @@ -128,7 +123,7 @@ class TestErrors:

def test_no_secret_key(self):
with raises(SessionError):
PoorSession(Empty)
PoorSession(None)

def test_bad_session(self):
cookies = SimpleCookie()
Expand All @@ -138,25 +133,15 @@ def test_bad_session(self):
with raises(SessionError):
session.load(cookies)

def test_bad_session_compatibility(self, req):
req.cookies = SimpleCookie()
req.cookies["SESSID"] = "\0"

with raises(SessionError):
PoorSession(req)


class TestLoadWrite:
"""Tests of load and write methods."""

def test_compatibility_empty(self, req):
session = PoorSession(req)
def test_empty(self):
session = PoorSession(SECRET_KEY)
session.load(SimpleCookie())
assert session.data == {}

def test_compatibility(self, req_session):
session = PoorSession(req_session)
assert session.data == {'test': True}

def test_write_load(self, req_session):
"""Method write was called in fixture req_session."""
session = PoorSession(SECRET_KEY)
Expand Down

0 comments on commit 5e457cd

Please sign in to comment.