-
Notifications
You must be signed in to change notification settings - Fork 22
/
yutils.py
153 lines (122 loc) · 3.93 KB
/
yutils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import asyncio
from enum import IntEnum
from pathlib import Path
from typing import Optional
import aiofiles.os # type: ignore
import y_py as Y
class YMessageType(IntEnum):
SYNC = 0
AWARENESS = 1
class YSyncMessageType(IntEnum):
SYNC_STEP1 = 0
SYNC_STEP2 = 1
SYNC_UPDATE = 2
def write_var_uint(num: int) -> bytes:
res = []
while num > 127:
res.append(128 | (127 & num))
num >>= 7
res.append(num)
return bytes(res)
def create_message(data: bytes, msg_type: int) -> bytes:
return bytes([YMessageType.SYNC, msg_type]) + write_var_uint(len(data)) + data
def create_sync_step1_message(data: bytes) -> bytes:
return create_message(data, YSyncMessageType.SYNC_STEP1)
def create_sync_step2_message(data: bytes) -> bytes:
return create_message(data, YSyncMessageType.SYNC_STEP2)
def create_update_message(data: bytes) -> bytes:
return create_message(data, YSyncMessageType.SYNC_UPDATE)
def read_message(stream: bytes) -> bytes:
message = Decoder(stream).read_message()
assert message is not None
return message
class Decoder:
def __init__(self, stream: bytes):
self.stream = stream
self.length = len(stream)
self.i0 = 0
def read_var_uint(self) -> int:
if self.length <= 0:
raise RuntimeError("Y protocol error")
uint = 0
i = 0
while True:
byte = self.stream[self.i0]
uint += (byte & 127) << i
i += 7
self.i0 += 1
self.length -= 1
if byte < 128:
break
return uint
def read_message(self) -> Optional[bytes]:
if self.length == 0:
return None
length = self.read_var_uint()
if length == 0:
return b""
i1 = self.i0 + length
message = self.stream[self.i0 : i1] # noqa
self.i0 = i1
self.length -= length
return message
def read_messages(self):
while True:
message = self.read_message()
if message is None:
return
yield message
def read_var_string(self):
message = self.read_message()
if message is None:
return ""
return message.decode("utf-8")
def put_updates(update_queue: asyncio.Queue, ydoc: Y.YDoc, event: Y.AfterTransactionEvent) -> None:
update_queue.put_nowait(event.get_update())
async def process_sync_message(message: bytes, ydoc: Y.YDoc, websocket, log) -> None:
message_type = message[0]
msg = message[1:]
log.debug(
"Received %s message from endpoint: %s",
YSyncMessageType(message_type).name,
websocket.path,
)
if message_type == YSyncMessageType.SYNC_STEP1:
state = read_message(msg)
update = Y.encode_state_as_update(ydoc, state)
reply = create_sync_step2_message(update)
log.debug(
"Sending %s message to endpoint: %s",
YSyncMessageType.SYNC_STEP2.name,
websocket.path,
)
await websocket.send(reply)
elif message_type in (
YSyncMessageType.SYNC_STEP2,
YSyncMessageType.SYNC_UPDATE,
):
update = read_message(msg)
# Ignore empty updates (see https://github.com/y-crdt/ypy/issues/98)
if update != b"\x00\x00":
Y.apply_update(ydoc, update)
async def sync(ydoc: Y.YDoc, websocket, log):
state = Y.encode_state_vector(ydoc)
msg = create_sync_step1_message(state)
log.debug(
"Sending %s message to endpoint: %s",
YSyncMessageType.SYNC_STEP1.name,
websocket.path,
)
await websocket.send(msg)
async def get_new_path(path: str) -> str:
p = Path(path)
ext = p.suffix
p_noext = p.with_suffix("")
i = 1
dir_list = await aiofiles.os.listdir()
while True:
new_path = f"{p_noext}({i}){ext}"
if new_path not in dir_list:
break
i += 1
return str(new_path)