Skip to content

Commit

Permalink
lololololol
Browse files Browse the repository at this point in the history
  • Loading branch information
Josh-XT committed Jun 13, 2024
1 parent a0d826f commit 86799f8
Showing 59 changed files with 7,187 additions and 3 deletions.
6 changes: 3 additions & 3 deletions MagicalAuth.py
Original file line number Diff line number Diff line change
@@ -2,9 +2,9 @@
from Models import UserInfo, Register, Login
from fastapi import Header, HTTPException
from Globals import getenv
from sso.Google import google_sso
from sso.Microsoft import microsoft_sso
from sso.GitHub import github_sso
from sso.google import google_sso
from sso.microsoft import microsoft_sso
from sso.github import github_sso
from datetime import datetime, timedelta
from fastapi import HTTPException
from sendgrid import SendGridAPIClient
152 changes: 152 additions & 0 deletions sso/amazon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import base64
import json
import requests
import logging
from email.mime.text import MIMEText
from fastapi import HTTPException
from Globals import getenv

"""
Required environment variables:
- AWS_CLIENT_ID: AWS Cognito OAuth client ID
- AWS_CLIENT_SECRET: AWS Cognito OAuth client secret
- AWS_USER_POOL_ID: AWS Cognito User Pool ID
- AWS_REGION: AWS Cognito Region
Required scopes for AWS OAuth
- openid
- email
- profile
"""

class AmazonSSO:
def __init__(
self,
access_token=None,
refresh_token=None,
):
self.access_token = access_token
self.refresh_token = refresh_token
self.client_id = getenv("AWS_CLIENT_ID")
self.client_secret = getenv("AWS_CLIENT_SECRET")
self.user_pool_id = getenv("AWS_USER_POOL_ID")
self.region = getenv("AWS_REGION")
self.user_info = self.get_user_info()

def get_new_token(self):
response = requests.post(
f"https://{self.user_pool_id}.auth.{self.region}.amazoncognito.com/oauth2/token",
data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
"grant_type": "refresh_token",
"scope": "openid email profile",
},
)
return response.json()["access_token"]

def get_user_info(self):
uri = f"https://{self.user_pool_id}.auth.{self.region}.amazoncognito.com/oauth2/userInfo"
response = requests.get(
uri,
headers={"Authorization": f"Bearer {self.access_token}"},
)
if response.status_code == 401:
self.access_token = self.get_new_token()
response = requests.get(
uri,
headers={"Authorization": f"Bearer {self.access_token}"},
)
try:
data = response.json()
first_name = data.get("given_name", "")
last_name = data.get("family_name", "")
email = data["email"]
return {
"email": email,
"first_name": first_name,
"last_name": last_name,
}
except:
raise HTTPException(
status_code=400,
detail="Error getting user info from AWS",
)

def send_email(self, to, subject, message_text):
if not self.user_info.get("email"):
user_info = self.get_user_info()
self.email_address = user_info["email"]
message = MIMEText(message_text)
message["to"] = to
message["from"] = self.email_address
message["subject"] = subject
raw = base64.urlsafe_b64encode(message.as_bytes())
raw = raw.decode()
email_data = {
"Source": self.email_address,
"Destination": {
"ToAddresses": [to],
},
"Message": {
"Subject": {
"Data": subject,
},
"Body": {
"Text": {
"Data": message_text,
},
},
},
}
response = requests.post(
f"https://email.{self.region}.amazonaws.com/v2/email/outbound-emails",
headers={
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
},
data=json.dumps(email_data),
)
if response.status_code == 401:
self.access_token = self.get_new_token()
response = requests.post(
f"https://email.{self.region}.amazonaws.com/v2/email/outbound-emails",
headers={
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
},
data=json.dumps(email_data),
)
return response.json()


def amazon_sso(code, redirect_uri=None) -> AmazonSSO:
if not redirect_uri:
redirect_uri = getenv("MAGIC_LINK_URL")
code = (
str(code)
.replace("%2F", "/")
.replace("%3D", "=")
.replace("%3F", "?")
.replace("%3D", "=")
)
response = requests.post(
f"https://{getenv('AWS_USER_POOL_ID')}.auth.{getenv('AWS_REGION')}.amazoncognito.com/oauth2/token",
data={
"client_id": getenv("AWS_CLIENT_ID"),
"client_secret": getenv("AWS_CLIENT_SECRET"),
"code": code,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
},
)
if response.status_code != 200:
logging.error(f"Error getting AWS access token: {response.text}")
return None, None
data = response.json()
access_token = data["access_token"]
refresh_token = data["refresh_token"]
return AmazonSSO(access_token=access_token, refresh_token=refresh_token)
151 changes: 151 additions & 0 deletions sso/aol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
import base64
import json
import requests
import logging
from email.mime.text import MIMEText
from fastapi import HTTPException
from Globals import getenv

"""
Required environment variables:
- AOL_CLIENT_ID: AOL OAuth client ID
- AOL_CLIENT_SECRET: AOL OAuth client secret
Note: This example assumes hypothetical OAuth and API endpoints for AOL since AOL does not provide OAuth for individual users publicly like Google or Microsoft. Replace the endpoints and scopes with the actual values if available.
Required scopes for AOL OAuth
- https://api.aol.com/userinfo.profile
- https://api.aol.com/userinfo.email
- https://api.aol.com/mail.send
"""


class AOLSSO:
def __init__(
self,
access_token=None,
refresh_token=None,
):
self.access_token = access_token
self.refresh_token = refresh_token
self.client_id = getenv("AOL_CLIENT_ID")
self.client_secret = getenv("AOL_CLIENT_SECRET")
self.user_info = self.get_user_info()

def get_new_token(self):
response = requests.post(
"https://api.login.aol.com/oauth2/token",
data={
"client_id": self.client_id,
"client_secret": self.client_secret,
"refresh_token": self.refresh_token,
"grant_type": "refresh_token",
},
)
return response.json()["access_token"]

def get_user_info(self):
uri = "https://api.aol.com/userinfo/v1/me"
response = requests.get(
uri,
headers={"Authorization": f"Bearer {self.access_token}"},
)
if response.status_code == 401:
self.access_token = self.get_new_token()
response = requests.get(
uri,
headers={"Authorization": f"Bearer {self.access_token}"},
)
try:
data = response.json()
first_name = data["names"][0]["givenName"]
last_name = data["names"][0]["familyName"]
email = data["emailAddresses"][0]["value"]
return {
"email": email,
"first_name": first_name,
"last_name": last_name,
}
except:
raise HTTPException(
status_code=400,
detail="Error getting user info from AOL",
)

def send_email(self, to, subject, message_text):
if not self.user_info.get("email"):
user_info = self.get_user_info()
self.email_address = user_info["email"]
message = MIMEText(message_text)
message["to"] = to
message["from"] = self.email_address
message["subject"] = subject
raw = base64.urlsafe_b64encode(message.as_bytes())
raw = raw.decode()
email_data = {
"message": {
"subject": subject,
"body": {
"contentType": "Text",
"content": message_text,
},
"toRecipients": [
{
"emailAddress": {
"address": to,
}
}
],
},
}
response = requests.post(
"https://api.aol.com/mail/v1/messages/send",
headers={
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
},
data=json.dumps(email_data),
)
if response.status_code == 401:
self.access_token = self.get_new_token()
response = requests.post(
"https://api.aol.com/mail/v1/messages/send",
headers={
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",
},
data=json.dumps(email_data),
)
return response.json()


def aol_sso(code, redirect_uri=None) -> AOLSSO:
if not redirect_uri:
redirect_uri = getenv("MAGIC_LINK_URL")
code = (
str(code)
.replace("%2F", "/")
.replace("%3D", "=")
.replace("%3F", "?")
.replace("%3D", "=")
)
response = requests.post(
f"https://api.login.aol.com/oauth2/token",
data={
"client_id": getenv("AOL_CLIENT_ID"),
"client_secret": getenv("AOL_CLIENT_SECRET"),
"code": code,
"grant_type": "authorization_code",
"redirect_uri": redirect_uri,
"scope": "https://api.aol.com/userinfo.profile https://api.aol.com/userinfo.email https://api.aol.com/mail.send",
},
)
if response.status_code != 200:
logging.error(f"Error getting AOL access token: {response.text}")
return None, None
data = response.json()
access_token = data["access_token"]
refresh_token = data["refresh_token"] if "refresh_token" in data else "Not provided"
return AOLSSO(access_token=access_token, refresh_token=refresh_token)
Loading

0 comments on commit 86799f8

Please sign in to comment.