-
Notifications
You must be signed in to change notification settings - Fork 2
/
lambda_function.py
46 lines (43 loc) · 1.52 KB
/
lambda_function.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import json
from vehicle_command_controller import VehicleCommandController
from vehicle_command_controller import ControllerException
from vehicle_command_controller import BadMethodException
import threading
import os
import settings
def lambda_handler(event, context):
# NOTE: event contains the entire API gateway context
# event => json decoded params from aws api gateway
try:
request_body_json = event
method = 'POST' # assimng we set up the API gateway like this
path = '/'
query_params = {}
[ status_code, response_body ] = dispatch_method(
method, path,
request_body_json,
query_params
)
except json.decoder.JSONDecodeError as err:
status_code = 400
response_body = {'error': 'Bad Request, invalid JSON'}
# NOTE: return value must follow expected format for
# API gateway lambda proxy intgration
return json.dumps(response_body)
def dispatch_method(method, path, params, query_params):
try:
if not method == 'POST':
raise BadMethodException(f'HTTP {method} not supported')
ctrl = VehicleCommandController()
ctrl.post_command(params)
ctrl.execute_command()
return ctrl.response
except ControllerException as ex:
return (ex.status_code, { "error": str(ex) })
if __name__ == "__main__":
event = {
'pin': os.getenv("SUBARU_PIN"),
'command': 'unlock'
}
context = None
print(lambda_handler(event, context))