-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchat.py
executable file
·77 lines (53 loc) · 2.21 KB
/
chat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
from sanic import Sanic
from sanic.response import html
import socketio
import re
sio = socketio.AsyncServer(async_mode='sanic')
app = Sanic()
sio.attach(app)
@app.route('/')
async def index(request):
with open('chat.html') as f:
return html(f.read())
@sio.on('show username', namespace='/test')
async def test_message(sid, message):
await sio.emit('my response', {'data': 'user ' + message['data'] + ' has connected.'}, room=sid, namespace='/test')
@sio.on('join room', namespace='/test')
async def join(sid, message):
sio.enter_room(sid, message['room'], namespace='/test')
await sio.emit('my response', {'data': message['username'] + ' entered room: ' + message['room']},
room=message['room'], namespace='/test')
@sio.on('leave room', namespace='/test')
async def leave(sid, message):
sio.leave_room(sid, message['room'], namespace='/test')
await sio.emit('my response', {'data': 'Left room: ' + message['room']},
room=sid, namespace='/test')
@sio.on('list rooms', namespace='/test')
async def roomlist(sid):
rooms = sio.manager.rooms['/test']
room_str = ''
for r in rooms:
# exclude personal rooms
if r is not None and not re.findall('[a-z0-9]{32}', r):
room_str += r + ","
await sio.emit('my response', {'data': 'Rooms list: ' + room_str[0:len(room_str)-1]},
room=sid, namespace='/test')
@sio.on('close room', namespace='/test')
async def close(sid, message):
await sio.emit('my response',
{'data': 'Room ' + message['room'] + ' is closing.'},
room=message['room'], namespace='/test')
await sio.close_room(message['room'], namespace='/test')
@sio.on('my room event', namespace='/test')
async def send_room_message(sid, message):
await sio.emit('my response', {'data': message['data']},
room=message['room'], namespace='/test')
@sio.on('disconnect request', namespace='/test')
async def disconnect_request(sid):
await sio.disconnect(sid, namespace='/test')
@sio.on('disconnect', namespace='/test')
def test_disconnect(sid):
print('Client disconnected')
app.static('/static', './static')
if __name__ == '__main__':
app.run()