-
Notifications
You must be signed in to change notification settings - Fork 86
/
interface.py
364 lines (313 loc) · 12.9 KB
/
interface.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
import time
from functools import wraps
import redis
from redis import RedisError
from . import logger
from .exceptions import UnavailableDataError, MissingClientError
BLOCKING_ATTEMPT_ERROR_THRESHOLD = 10
BLOCKING_ATTEMPT_SUPPRESSION = BLOCKING_ATTEMPT_ERROR_THRESHOLD + 5
def blockable(f):
"""
"blocking" decorator for Redis accessor methods. Wrapped functions that specify kwarg 'blocking'
will wait for the specified accessor to return with data.::
class SonicV2Connector:
@blockable
def keys(self, db_id):
# ...
# call with:
db = SonicV2Connector()
# ...
db.keys('DATABASE', blocking=True)
"""
@wraps(f)
def wrapped(inst, db_id, *args, **kwargs):
blocking = kwargs.pop('blocking', False)
attempts = 0
while True:
try:
ret_data = f(inst, db_id, *args, **kwargs)
inst._unsubscribe_keyspace_notification(db_id)
return ret_data
except UnavailableDataError as e:
if blocking:
if db_id in inst.keyspace_notification_channels:
result = inst._unavailable_data_handler(db_id, e.data)
if result:
continue # received updates, try to read data again
else:
inst._unsubscribe_keyspace_notification(db_id)
raise # No updates was received. Raise exception
else: # Subscribe to updates and try it again (avoiding race condition)
inst._subscribe_keyspace_notification(db_id)
else:
return None
except redis.exceptions.ResponseError:
"""
A response error indicates that something is fundamentally wrong with the request itself.
Retrying the request won't pass unless the schema itself changes. In this case, the error
should be attributed to the application itself. Re-raise the error.
"""
logger.exception("Bad DB request [{}:{}]{{ {} }}".format(db_id, f.__name__, str(args)))
raise
except (redis.exceptions.RedisError, OSError):
attempts += 1
inst._connection_error_handler(db_id)
msg = "DB access failure by [{}:{}]{{ {} }}".format(db_id, f.__name__, str(args))
if BLOCKING_ATTEMPT_ERROR_THRESHOLD < attempts < BLOCKING_ATTEMPT_SUPPRESSION:
# Repeated access failures implies the database itself is unhealthy.
logger.exception(msg=msg)
else:
logger.warning(msg=msg)
return wrapped
class DBRegistry(dict):
def __getitem__(self, item):
if item not in self:
raise MissingClientError("No client connected for db_id '{}'".format(item))
return dict.__getitem__(self, item)
class DBInterface(object):
REDIS_HOST = '127.0.0.1'
"""
SONiC does not use a password-protected database. By default, Redis will only allow connections to unprotected
DBs over the loopback ip.
"""
REDIS_PORT = 6379
"""
SONiC uses the default port.
"""
REDIS_UNIX_SOCKET_PATH = "/var/run/redis/redis.sock"
"""
SONiC uses the default unix socket.
"""
CONNECT_RETRY_WAIT_TIME = 10
"""
Wait period in seconds before attempting to reconnect to Redis.
"""
DATA_RETRIEVAL_WAIT_TIME = 3
"""
Wait period in seconds to wait before attempting to retrieve missing data.
"""
PUB_SUB_NOTIFICATION_TIMEOUT = 10.0 # seconds
"""
Time to wait for any given message to arrive via pub-sub.
"""
PUB_SUB_MAXIMUM_DATA_WAIT = 60.0 # seconds
"""
Maximum allowable time to wait on a specific pub-sub notification.
"""
KEYSPACE_PATTERN = '__key*__:*'
"""
Pub-sub keyspace pattern
"""
KEYSPACE_EVENTS = 'KEA'
"""
In Redis, by default keyspace events notifications are disabled because while not
very sensible the feature uses some CPU power. Notifications are enabled using
the notify-keyspace-events of redis.conf or via the CONFIG SET.
In order to enable the feature a non-empty string is used, composed of multiple characters,
where every character has a special meaning according to the following table:
K - Keyspace events, published with __keyspace@<db>__ prefix.
E - Keyevent events, published with __keyevent@<db>__ prefix.
g - Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
$ - String commands
l - List commands
s - Set commands
h - Hash commands
z - Sorted set commands
x - Expired events (events generated every time a key expires)
e - Evicted events (events generated when a key is evicted for maxmemory)
A - Alias for g$lshzxe, so that the "AKE" string means all the events.
ACS Redis db mainly uses hash, therefore h is selected.
"""
def __init__(self, **kwargs):
super(DBInterface, self).__init__()
# Store the arguments for redis client
self.redis_kwargs = kwargs
if len(self.redis_kwargs) == 0:
self.redis_kwargs['unix_socket_path'] = self.REDIS_UNIX_SOCKET_PATH
# For thread safety as recommended by python-redis
# Create a separate client for each database
self.redis_clients = DBRegistry()
# Create a channel for receiving needed keyspace event
# notifications for each client
self.keyspace_notification_channels = DBRegistry()
def connect(self, db_id, retry_on=True):
"""
:param db_id: database id to connect to
:param retry_on: if ``True`` -- will attempt to connect continuously.
if ``False``, only one attempt will be made.
"""
if retry_on:
self._persistent_connect(db_id)
else:
self._onetime_connect(db_id)
def _onetime_connect(self, db_id):
"""
Connect to database id.
"""
if db_id is None:
raise ValueError("No database ID configured for '{}'".format(db_id))
client = redis.StrictRedis(db=db_id, **self.redis_kwargs)
# Enable the notification mechanism for keyspace events in Redis
client.config_set('notify-keyspace-events', self.KEYSPACE_EVENTS)
self.redis_clients[db_id] = client
def _persistent_connect(self, db_id):
"""
Keep reconnecting to Database 'db_id' until success
"""
while True:
try:
self._onetime_connect(db_id)
return
except RedisError:
t_wait = self.CONNECT_RETRY_WAIT_TIME
logger.warning("Connecting to DB '{}' failed, will retry in {}s".format(db_id, t_wait))
self.close(db_id)
time.sleep(t_wait)
def close(self, db_id):
"""
Close all client(s) / keyspace channels.
:param db_id: DB to disconnect from.
"""
if db_id in self.redis_clients:
self.redis_clients[db_id].connection_pool.disconnect()
if db_id in self.keyspace_notification_channels:
self.keyspace_notification_channels[db_id].close()
def _subscribe_keyspace_notification(self, db_id):
"""
Subscribe the chosent client to keyspace event notifications
"""
logger.debug("Subscribe to keyspace notification")
client = self.redis_clients[db_id]
pubsub = client.pubsub()
pubsub.psubscribe(self.KEYSPACE_PATTERN)
self.keyspace_notification_channels[db_id] = pubsub
def _unsubscribe_keyspace_notification(self, db_id):
"""
Unsubscribe the chosent client from keyspace event notifications
"""
if db_id in self.keyspace_notification_channels:
logger.debug("Unsubscribe from keyspace notification")
self.keyspace_notification_channels[db_id].close()
del self.keyspace_notification_channels[db_id]
def get_redis_client(self, db_id):
"""
:param db_id: Name of the DB to query
:return: The Redis client instance.
"""
return self.redis_clients[db_id]
def publish(self, db_id, channel, message):
"""
Publish message via the channel
"""
client = self.redis_clients[db_id]
return client.publish(channel, message)
def expire(self, db_id, key, timeout_sec):
"""
Set a timeout on a key
"""
client = self.redis_clients[db_id]
return client.expire(key, timeout_sec)
def exists(self, db_id, key):
"""
Check if a key exist in the db
"""
client = self.redis_clients[db_id]
return client.exists(key)
@blockable
def keys(self, db_id, pattern='*'):
"""
Retrieve all the keys of DB %db_id
"""
client = self.redis_clients[db_id]
keys = client.keys(pattern=pattern)
if not keys:
message = "DB '{}' is empty!".format(db_id)
logger.warning(message)
raise UnavailableDataError(message, b'hset')
else:
return keys
@blockable
def get(self, db_id, _hash, key):
"""
Retrieve the value of Key %key from Hashtable %hash
in Database %db_id
Parameter %blocking indicates whether to wait
when the query fails
"""
client = self.redis_clients[db_id]
val = client.hget(_hash, key)
if not val:
message = "Key '{}' field '{}' unavailable in database '{}'".format(_hash, key, db_id)
logger.warning(message)
raise UnavailableDataError(message, _hash)
else:
# redis only supports strings. if any item is set to string 'None', cast it back to the appropriate type.
return None if val == b'None' else val
@blockable
def get_all(self, db_id, _hash):
"""
Get Hashtable %hash from DB %db_id
Parameter %blocking indicates whether to wait
if the hashtable has not been created yet
"""
client = self.redis_clients[db_id]
table = client.hgetall(_hash)
if not table:
message = "Key '{}' unavailable in database '{}'".format(_hash, db_id)
logger.warning(message)
raise UnavailableDataError(message, _hash)
else:
# redis only supports strings. if any item is set to string 'None', cast it back to the appropriate type.
return {k: None if v == b'None' else v for k, v in table.items()}
@blockable
def set(self, db_id, _hash, key, val):
"""
Add %(key, val) to Hashtable %hash in DB %db_id
Parameter %blocking indicates whether to retry in case of failure
"""
client = self.redis_clients[db_id]
return client.hset(_hash, key, val)
@blockable
def delete(self, db_id, key):
"""
Delete %key from DB %db_id
Parameter %blocking indicates whether to retry in case of failure
"""
client = self.redis_clients[db_id]
return client.delete(key)
@blockable
def delete_all_by_pattern(self, db_id, pattern):
"""
Delete all keys which match %pattern from DB %db_id
Parameter %blocking indicates whether to retry in case of failure
"""
client = self.redis_clients[db_id]
keys = client.keys(pattern)
for key in keys:
client.delete(key)
def _unavailable_data_handler(self, db_id, data):
"""
When the queried config is not available in Redis--wait until it is available.
Two timeouts are at work here:
1. Notification timeout - how long to wait before giving up on receiving any given pub-sub message.
2. Max data wait - swsssdk-specific. how long to wait for the data to populate (in absolute time)
"""
start = time.time()
logger.debug("Listening on pubsub channel '{}'".format(db_id))
while time.time() - start < self.PUB_SUB_MAXIMUM_DATA_WAIT:
msg = self.keyspace_notification_channels[db_id].get_message(timeout=self.PUB_SUB_NOTIFICATION_TIMEOUT)
if msg is not None and msg.get('data') == data:
logger.info("'{}' acquired via pub-sub. Unblocking...".format(data, db_id))
# Wait for a "settling" period before releasing the wait.
time.sleep(self.DATA_RETRIEVAL_WAIT_TIME)
return True
logger.warning("No notification for '{}' from '{}' received before timeout.".format(data, db_id))
return False
def _connection_error_handler(self, db_id):
"""
In the event Redis is unavailable, close existing connections, and try again.
"""
logger.warning('Could not connect to Redis--waiting before trying again.')
self.close(db_id)
time.sleep(self.CONNECT_RETRY_WAIT_TIME)
self.connect(db_id, True)