-
Notifications
You must be signed in to change notification settings - Fork 9
/
Copy pathmessaging.py
71 lines (60 loc) · 2.11 KB
/
messaging.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from aries_cloudcontroller import PingRequest, PingRequestResponse, SendMessage
from fastapi import APIRouter, Depends
from app.dependencies.acapy_clients import client_from_auth
from app.dependencies.auth import AcaPyAuth, acapy_auth_from_header
from app.exceptions.handle_acapy_call import handle_acapy_call
from app.models.messaging import Message, TrustPingMsg
from shared.log_config import get_logger
logger = get_logger(__name__)
router = APIRouter(prefix="/v1/messaging", tags=["messaging"])
@router.post("/send-message")
async def send_messages(
message: Message,
auth: AcaPyAuth = Depends(acapy_auth_from_header),
):
"""
Send basic message.
Parameters:
-----------
message: Message
payload for sending a message
Returns:
---------
The response object obtained when sending a message.
"""
logger.info("POST request received: Send message")
request_body = SendMessage(content=message.content)
async with client_from_auth(auth) as aries_controller:
await handle_acapy_call(
logger=logger,
acapy_call=aries_controller.basicmessage.send_message,
conn_id=message.connection_id,
body=request_body,
)
logger.info("Successfully sent message.")
@router.post("/trust-ping", response_model=PingRequestResponse)
async def send_trust_ping(
trustping_msg: TrustPingMsg,
auth: AcaPyAuth = Depends(acapy_auth_from_header),
):
"""
Trust ping
Parameters:
-----------
trustping_msg : TrustPingMsg
payload for sending a trust ping
Returns:
--------
The response object obtained when sending a trust ping.
"""
logger.info("POST request received: Send trust ping")
request_body = PingRequest(comment=trustping_msg.comment)
async with client_from_auth(auth) as aries_controller:
response = await handle_acapy_call(
logger=logger,
acapy_call=aries_controller.trustping.send_ping,
conn_id=trustping_msg.connection_id,
body=request_body,
)
logger.info("Successfully sent trust ping.")
return response