-
Notifications
You must be signed in to change notification settings - Fork 0
/
session.py
102 lines (80 loc) · 2.87 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import redis.asyncio as redis
import scrypt
import secrets
import os
import json
from fastapi import HTTPException, WebSocket, Depends
from fastapi.security import OAuth2PasswordBearer
from pydantic import BaseModel, constr
r = redis.Redis()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/token.json")
class Session(BaseModel):
token: str
public_key: str
class APIKeypair(BaseModel):
public_key: constr(min_length=46, max_length=46)
secret_key: constr(min_length=46, max_length=46)
async def getSession(token: str = Depends(oauth2_scheme)):
public_key = await r.get(f"bearer_{token}")
if(public_key == None):
raise HTTPException(status_code=401)
return Session(token=token, public_key=public_key)
async def getSessionWS(websocket: WebSocket):
authorization = websocket.headers.get('authorization')
invalid = {
"event": "hangup",
"message": "Invalid Authorization"
}
# Hangup if no authorization header
if(authorization == None):
await websocket.accept()
await websocket.send_json(invalid)
await websocket.close()
return
# Hangup if Bearer not formatted properly
split = authorization.split(' ')
if(len(split) != 2 or split[0].lower() != 'bearer'):
await websocket.accept()
await websocket.send_json(invalid)
await websocket.close()
return
# Hangup if Bearer is invalid
public_key = await r.get(f"bearer_{split[1]}")
if(public_key == None):
await websocket.accept()
await websocket.send_json(invalid)
await websocket.close()
return
return Session(token=split[1], public_key=public_key)
async def authenticate(keys: APIKeypair):
ttl = 3600
# Get the Encrypted Secret for the Public Key
# Raise 401 if no public key found
encrypted = await r.hget('apikeys', keys.public_key)
if(encrypted == None):
raise HTTPException(status_code=401)
# Check if the provided secret is verified
# by the encrypted stored secret. If so
# generate and store token
scrypt.decrypt(encrypted, keys.secret_key, maxtime=0.5, encoding=None)
token = secrets.token_urlsafe(32)
await r.set(f"bearer_{token}", keys.public_key, ttl)
return { "token": token, "ttl": ttl }
async def generateKeypair():
public_key = f"pk_{secrets.token_urlsafe(32)}"
secret_key = f"sk_{secrets.token_urlsafe(32)}"
encrypted = scrypt.encrypt(os.urandom(64), secret_key, maxtime=0.5)
await r.hset('apikeys', public_key, encrypted)
return { "public_key": public_key, "secret_key": secret_key }
async def generate(path):
keypair = await generateKeypair()
print(path)
with open(path, 'w') as f:
json.dump(keypair, f, indent=4)
return
async def revoke(public_key):
await r.hdel('apikeys', public_key)
return
async def purge():
await r.flushdb()
return