Skip to content

Commit

Permalink
feat: added simpler webapp workflow; removed session_manager
Browse files Browse the repository at this point in the history
  • Loading branch information
proffapt committed Jul 10, 2024
1 parent 5c30733 commit bd8e509
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 226 deletions.
151 changes: 57 additions & 94 deletions examples/server.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,16 @@
import logging
from flask_cors import CORS
import requests
import iitkgp_erp_login.erp as erp
import iitkgp_erp_login.utils as erp_utils
from flask import Flask, request, jsonify
from iitkgp_erp_login import session_manager


app = Flask(__name__)
CORS(app)

jwt_secret_key = "top-secret-unhackable-key"
headers = {
'timeout': '20',
'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Ubuntu Chromium/51.0.2704.79 Chrome/51.0.2704.79 Safari/537.36',
}

session_manager = session_manager.SessionManager(
jwt_secret_key=jwt_secret_key, headers=headers)


class ErpResponse:
def __init__(self, success: bool, message: str = None, data: dict = None, status_code: int = 200):
Expand All @@ -40,19 +35,6 @@ def to_response(self):
return jsonify(self.to_dict()), self.status_code


def handle_auth() -> ErpResponse:
if "Authorization" in request.headers:
header = request.headers["Authorization"].split(" ")
if len(header) == 2:
return ErpResponse(True, data={
"jwt": header[1]
}).to_response()
else:
return ErpResponse(False, "Poorly formatted authorization header. Should be of format 'Bearer <token>'", status_code=401).to_response()
else:
return ErpResponse(False, "Authentication token not provided", status_code=401).to_response()


@app.route("/secret-question", methods=["POST"])
def get_secret_question():
try:
Expand All @@ -61,11 +43,14 @@ def get_secret_question():
if not roll_number:
return ErpResponse(False, "Roll Number not provided", status_code=400).to_response()

secret_question, jwt = session_manager.get_secret_question(
roll_number)
return ErpResponse(True, data={
"secret_question": secret_question,
"jwt": jwt
session = requests.Session()
secret_question = erp.get_secret_question(
headers=headers, session=session, roll_number=roll_number, log=True)
sessionToken = erp_utils.get_cookie(session, 'JSESSIONID')

return ErpResponse(True, message="ERP Login Completed!" data={
"SECRET_QUESTION": secret_question,
"SESSION_TOKEN": sessionToken
}).to_response()
except Exception as e:
return ErpResponse(False, str(e), status_code=500).to_response()
Expand All @@ -74,19 +59,28 @@ def get_secret_question():
@app.route("/request-otp", methods=["POST"])
def request_otp():
try:
jwt = None
auth_resp, status_code = handle_auth()
if status_code != 200:
return auth_resp, status_code
else:
jwt = auth_resp.get_json().get("jwt")

password = request.form.get("password")
secret_answer = request.form.get("secret_answer")
if not all([password, secret_answer]):
return ErpResponse(False, "Missing password or secret answer", status_code=400).to_response()

session_manager.request_otp(jwt, password, secret_answer)
sessionToken = request.headers["SessionToken"]
if not sessionToken:
return ErpResponse(False, "sessionToken header not found", status_code=400).to_response()

data = request.form
roll_number = data.get("roll_number")
password = data.get("password")
secret_answer = data.get("secret_answer")
if not all([roll_number, password, secret_answer]):
return ErpResponse(False, "Missing roll_number or password or secret answer", status_code=400).to_response()

session = requests.Session()
erp_utils.set_cookie(session, 'JSESSIONID', sessionToken)
login_details = erp.get_login_details(
ROLL_NUMBER=roll_number,
PASSWORD=password,
secret_answer=secret_answer,
sessionToken=sessionToken
)
erp.request_otp(headers=headers, session=session,
login_details=login_details, log=True)

return ErpResponse(True, message="OTP has been sent to your connected email accounts").to_response()
except Exception as e:
return ErpResponse(False, str(e), status_code=500).to_response()
Expand All @@ -95,64 +89,33 @@ def request_otp():
@app.route("/login", methods=["POST"])
def login():
try:
jwt = None
auth_resp, status_code = handle_auth()
if status_code != 200:
return auth_resp, status_code
else:
jwt = auth_resp.get_json().get("jwt")

password = request.form.get("password")
secret_answer = request.form.get("secret_answer")
otp = request.form.get("otp")
if not all([secret_answer, password, otp]):
return ErpResponse(False, "Missing password, secret answer or otp", status_code=400).to_response()

session_manager.login(jwt, password, secret_answer, otp)
return ErpResponse(True, message="Logged in to ERP").to_response()
except Exception as e:
return ErpResponse(False, str(e), status_code=500).to_response()


@app.route("/logout", methods=["GET"])
def logout():
try:
jwt = None
auth_resp, status_code = handle_auth()
if status_code != 200:
return auth_resp, status_code
else:
jwt = auth_resp.get_json().get("jwt")

session_manager.end_session(jwt=jwt)

return ErpResponse(True, message="Logged out of ERP").to_response()
except Exception as e:
return ErpResponse(False, str(e), status_code=500).to_response()
sessionToken = request.headers["SessionToken"]
if not sessionToken:
return ErpResponse(False, "sessionToken header not found", status_code=400).to_response()

data = request.form
roll_number = data.get("roll_number")
password = data.get("password")
secret_answer = data.get("secret_answer")
otp = data.get("otp")
if not all([roll_number, password, secret_answer, otp]):
return ErpResponse(False, "Missing roll_number or password or secret answer or otp", status_code=400).to_response()

login_details = erp.get_login_details(
ROLL_NUMBER=roll_number,
PASSWORD=password,
secret_answer=secret_answer,
sessionToken=sessionToken
)
login_details["email_otp"] = otp

session = requests.Session()
erp_utils.set_cookie(session, 'JSESSIONID', sessionToken)
ssoToken = erp.signin(headers=headers, session=session,
login_details=login_details, log=True)

@app.route("/timetable", methods=["POST"])
def timetable():
try:
jwt = None
auth_resp, status_code = handle_auth()
if status_code != 200:
return auth_resp, status_code
else:
jwt = auth_resp.get_json().get("jwt")

_, ssoToken = session_manager.get_erp_session(jwt=jwt)

ERP_TIMETABLE_URL = "https://erp.iitkgp.ac.in/Acad/student/view_stud_time_table.jsp"
data = {
"ssoToken": ssoToken,
"module_id": '16',
"menu_id": '40',
}
r = session_manager.request(
jwt=jwt, method='POST', url=ERP_TIMETABLE_URL, headers=headers, data=data)
return ErpResponse(True, data={
"status_code": r.status_code
"ssoToken": ssoToken
}).to_response()
except Exception as e:
return ErpResponse(False, str(e), status_code=500).to_response()
125 changes: 0 additions & 125 deletions src/iitkgp_erp_login/session_manager.py

This file was deleted.

22 changes: 15 additions & 7 deletions src/iitkgp_erp_login/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,24 @@
import logging
logging.basicConfig(level=logging.INFO)

def get_cookie(session: requests.Session, cookie_name: str, **kwargs):
"""Gets the session object with given cookie."""
return session.cookies.get(cookie_name, **kwargs)

def set_cookie(session: requests.Session, cookie_name: str, cookie_value: str, **kwargs):
"""Sets the session object with given cookie."""
session.cookies.set(cookie_name, cookie_value, domain='erp.iitkgp.ac.in', **kwargs)


def populate_session_with_login_tokens(session: requests.Session, ssoToken: str):
"""Populates the session object with given login tokens."""
session.cookies.clear() # Clear all cookies from the session
# This is done to clear out old cookies + cookies irrelevant for login,
session.cookies.clear() # Clear all cookies from the session
# This is done to clear out old cookies + cookies irrelevant for login,
# i.e. cookies obtained after traversing further inside ERP
# They will be generated later, since traversing will be part of loop - if the loop exists
session.cookies.set('ssoToken', ssoToken, domain='erp.iitkgp.ac.in')

set_cookie(session, 'ssoToken', ssoToken)


def write_tokens_to_file(token_file: str, sessionToken: str, ssoToken: str, log: bool):
"""Writes session tokens to the token file if present."""
Expand Down Expand Up @@ -78,13 +86,13 @@ def generate_token():

creds = None
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, scopes)
creds = Credentials.from_authorized_user_file(token_path, scopes)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(credentials_path, scopes)
creds = flow.run_local_server(port=0)
creds = flow.run_local_server(port=0)

if not os.path.exists(token_path):
with open(token_path, "w") as token:
Expand Down

0 comments on commit bd8e509

Please sign in to comment.