forked from EmpireProject/Empire
-
-
Notifications
You must be signed in to change notification settings - Fork 584
/
Copy pathuser_api.py
180 lines (144 loc) · 5.07 KB
/
user_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
from datetime import timedelta
from fastapi import Depends, File, HTTPException, UploadFile
from fastapi.security import OAuth2PasswordRequestForm
from starlette import status
from empire.server.api.api_router import APIRouter
from empire.server.api.jwt_auth import (
ACCESS_TOKEN_EXPIRE_MINUTES,
CurrentActiveUser,
Token,
authenticate_user,
create_access_token,
get_current_active_admin_user,
get_current_active_user,
get_password_hash,
)
from empire.server.api.v2.shared_dependencies import CurrentSession
from empire.server.api.v2.shared_dto import BadRequestResponse, NotFoundResponse
from empire.server.api.v2.user.user_dto import (
User,
UserPostRequest,
Users,
UserUpdatePasswordRequest,
UserUpdateRequest,
domain_to_dto_user,
)
from empire.server.core.db import models
from empire.server.server import main
user_service = main.usersv2
# no prefix so /token can be at root.
# Might also just move auth out of user router.
router = APIRouter(
tags=["users"],
responses={
404: {"description": "Not found", "model": NotFoundResponse},
400: {"description": "Bad request", "model": BadRequestResponse},
},
)
async def get_user(uid: int, db: CurrentSession):
user = user_service.get_by_id(db, uid)
if user:
return user
raise HTTPException(status_code=404, detail=f"User not found for id {uid}")
@router.post("/token", response_model=Token)
async def login_for_access_token(
db: CurrentSession,
form_data: OAuth2PasswordRequestForm = Depends(),
):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/api/v2/users/me", response_model=User)
async def read_user_me(current_user: CurrentActiveUser):
return domain_to_dto_user(current_user)
@router.get(
"/api/v2/users",
response_model=Users,
dependencies=[Depends(get_current_active_user)],
)
async def read_users(db: CurrentSession):
users = [domain_to_dto_user(x) for x in user_service.get_all(db)]
return {"records": users}
@router.get(
"/api/v2/users/{uid}",
response_model=User,
dependencies=[Depends(get_current_active_user)],
)
async def read_user(uid: int, db_user: models.User = Depends(get_user)):
return domain_to_dto_user(db_user)
@router.post(
"/api/v2/users/",
status_code=201,
dependencies=[Depends(get_current_active_admin_user)],
)
async def create_user(user: UserPostRequest, db: CurrentSession):
resp, err = user_service.create_user(
db, user.username, get_password_hash(user.password), user.is_admin
)
if err:
raise HTTPException(status_code=400, detail=err)
return domain_to_dto_user(resp)
@router.put("/api/v2/users/{uid}", response_model=User)
async def update_user(
uid: int,
user_req: UserUpdateRequest,
current_user: CurrentActiveUser,
db: CurrentSession,
db_user: models.User = Depends(get_user),
):
if not (current_user.admin or current_user.id == uid):
raise HTTPException(
status_code=403, detail="User does not have access to update this resource."
)
if user_req.is_admin != db_user.admin and not current_user.admin:
raise HTTPException(
status_code=403,
detail="User does not have access to update admin status.",
)
# update
resp, err = user_service.update_user(db, db_user, user_req)
if err:
raise HTTPException(status_code=400, detail=err)
return domain_to_dto_user(resp)
@router.put("/api/v2/users/{uid}/password", response_model=User)
async def update_user_password(
uid: int,
user_req: UserUpdatePasswordRequest,
current_user: CurrentActiveUser,
db: CurrentSession,
db_user: models.User = Depends(get_user),
):
if not current_user.id == uid:
raise HTTPException(
status_code=403, detail="User does not have access to update this resource."
)
# update
resp, err = user_service.update_user_password(
db, db_user, get_password_hash(user_req.password)
)
if err:
raise HTTPException(status_code=400, detail=err)
return domain_to_dto_user(resp)
@router.post("/api/v2/users/{uid}/avatar", status_code=201)
async def create_avatar(
uid: int,
user: CurrentActiveUser,
db: CurrentSession,
file: UploadFile = File(...),
):
if not user.id == uid:
raise HTTPException(
status_code=403, detail="User does not have access to update this resource."
)
if not file.content_type.startswith("image/"):
raise HTTPException(status_code=400, detail="File must be an image.")
user_service.update_user_avatar(db, user, file)