-
-
Notifications
You must be signed in to change notification settings - Fork 591
/
app.py
96 lines (70 loc) · 2.54 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
#!/usr/bin/env python
# set instrument to `True` to accept connections from the official Socket.IO
# Admin UI hosted at https://admin.socket.io
instrument = False
admin_login = {
'username': 'admin',
'password': 'python', # change this to a strong secret for production use!
}
import uvicorn
import socketio
sio = socketio.AsyncServer(
async_mode='asgi',
cors_allowed_origins=None if not instrument else [
'http://localhost:5000',
'https://admin.socket.io', # edit the allowed origins if necessary
])
if instrument:
sio.instrument(auth=admin_login)
app = socketio.ASGIApp(sio, static_files={
'/': 'app.html',
})
background_task_started = False
async def background_task():
"""Example of how to send server generated events to clients."""
count = 0
while True:
await sio.sleep(10)
count += 1
await sio.emit('my_response', {'data': 'Server generated event'})
@sio.on('my_event')
async def test_message(sid, message):
await sio.emit('my_response', {'data': message['data']}, room=sid)
@sio.on('my_broadcast_event')
async def test_broadcast_message(sid, message):
await sio.emit('my_response', {'data': message['data']})
@sio.on('join')
async def join(sid, message):
await sio.enter_room(sid, message['room'])
await sio.emit('my_response', {'data': 'Entered room: ' + message['room']},
room=sid)
@sio.on('leave')
async def leave(sid, message):
await sio.leave_room(sid, message['room'])
await sio.emit('my_response', {'data': 'Left room: ' + message['room']},
room=sid)
@sio.on('close room')
async def close(sid, message):
await sio.emit('my_response',
{'data': 'Room ' + message['room'] + ' is closing.'},
room=message['room'])
await sio.close_room(message['room'])
@sio.on('my_room_event')
async def send_room_message(sid, message):
await sio.emit('my_response', {'data': message['data']},
room=message['room'])
@sio.on('disconnect request')
async def disconnect_request(sid):
await sio.disconnect(sid)
@sio.on('connect')
async def test_connect(sid, environ):
global background_task_started
if not background_task_started:
sio.start_background_task(background_task)
background_task_started = True
await sio.emit('my_response', {'data': 'Connected', 'count': 0}, room=sid)
@sio.on('disconnect')
def test_disconnect(sid):
print('Client disconnected')
if __name__ == '__main__':
uvicorn.run(app, host='127.0.0.1', port=5000)