-
Notifications
You must be signed in to change notification settings - Fork 7
/
txt.py
438 lines (368 loc) · 16.2 KB
/
txt.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
#! /usr/bin/env python3
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author: Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
# https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.
# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.rst
"""A collection of text functions.
The functions "to_text()" and "to_bytes()" are copied from
/usr/lib/python3.10/site-packages/ansible/module_utils/_text.py (BSD license).
"""
__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2024033101'
import codecs
import re
try:
codecs.lookup_error('surrogateescape')
HAS_SURROGATEESCAPE = True
except LookupError:
HAS_SURROGATEESCAPE = False
import operator
string_types = str
integer_types = int
class_types = type
text_type = str
binary_type = bytes
_COMPOSED_ERROR_HANDLERS = frozenset((None, 'surrogate_or_replace',
'surrogate_or_strict',
'surrogate_then_replace'))
def compile_regex(regex, key=''):
"""Return a compiled regex from a string or list.
Optionally, add a key qualifier/string to help identify the regex in case of an error.
"""
def cr(regex, key=''):
"""Return a compiled regex from a string.
"""
try:
return (True, re.compile(regex))
except re.error as e:
return (False, '`{}`{} contains one or more errors: {}'.format(
regex,
' ({})'.format(key) if key else '',
e,
))
if isinstance(regex, str):
return cr(regex, key=key)
else:
return [cr(item, key=key) for item in regex]
def extract_str(s, from_txt, to_txt, include_fromto=False, be_tolerant=True):
"""Extracts text between `from_txt` to `to_txt`.
If `include_fromto` is set to False (default), text is returned without both search terms,
otherwise `from_txt` and `to_txt` are included.
If `from_txt` is not found, always an empty string is returned.
If `to_txt` is not found and `be_tolerant` is set to True (default), text is returned from
`from_txt` til the end of input text. Otherwise an empty text is returned.
>>> extract_str('abcde', 'x', 'y')
''
>>> extract_str('abcde', 'b', 'x')
'cde'
>>> extract_str('abcde', 'b', 'b')
'cde'
>>> extract_str('abcde', 'b', 'x', include_fromto=True)
'bcde'
>>> extract_str('abcde', 'b', 'x', include_fromto=True, be_tolerant=False)
''
>>> extract_str('abcde', 'b', 'd')
'c'
>>> extract_str('abcde', 'b', 'd', include_fromto=True)
'bcd'
>>> s = ' Time zone: UTC (UTC, +0000)\nSystem clock synchronized: yes\n NTP service: active\n'
>>> extract_str(s, 'System clock synchronized: ', '\n', include_fromto=True)
'System clock synchronized: yes\n'
"""
pos1 = s.find(from_txt)
if pos1 == -1:
# nothing found
return ''
pos2 = s.find(to_txt, pos1+len(from_txt))
# to_txt not found:
if pos2 == -1 and be_tolerant and not include_fromto:
return s[pos1+len(from_txt):]
if pos2 == -1 and be_tolerant and include_fromto:
return s[pos1:]
if pos2 == -1 and not be_tolerant:
return ''
# from_txt and to_txt found:
if not include_fromto:
return s[pos1+len(from_txt):pos2-len(to_txt)+ 1]
return s[pos1:pos2+len(to_txt)]
def filter_mltext(_input, ignore):
"""Filter multi-line text, remove lines with matches a simple text ignore pattern (no regex).
`ignore` has to be a list.
>>> filter_mltext('abcde', 'a') # "ignore" has to be a list
''
>>> s = 'Lorem ipsum\ndolor sit amet\nconsectetur adipisicing'
>>> filter_mltext(s, ['ipsum'])
'dolor sit amet\nconsectetur adipisicing\n'
>>> filter_mltext(s, ['dol'])
'Lorem ipsum\nconsectetur adipisicing\n'
>>> filter_mltext(s, ['Dol'])
'Lorem ipsum\ndolor sit amet\nconsectetur adipisicing\n'
>>> filter_mltext(s, ['d'])
'Lorem ipsum\n'
>>> s = 'Lorem ipsum'
>>> filter_mltext(s, ['Dol'])
'Lorem ipsum\n'
>>> filter_mltext(s, ['ipsum'])
''
"""
filtered_input = ''
for line in _input.splitlines():
if not any(i_line in line for i_line in ignore):
filtered_input += line + '\n'
return filtered_input
def match_regex(regex, string, key=''):
"""Match a regex on a string.
Optionally, add a key qualifier/string to help identify the regex in case of an error.
"""
try:
return (True, re.match(regex, string))
except re.error as e:
return (False, '`{}` contains one or more errors: {}'.format(
regex,
' ({})'.format(key) if key else '',
e,
))
def mltext2array(_input, skip_header=False, sort_key=-1):
"""
>>> s = '1662130953 timedatex\n1662130757 python3-pip-wheel\n1662130975 python3-dateutil\n'
>>> mltext2array(s, skip_header=False, sort_key=0)
[['1662130757', 'python3-pip-wheel'], ['1662130953', 'timedatex'], ['1662130975', 'python3-dateutil']]
>>> mltext2array(s, skip_header=False, sort_key=1)
[['1662130975', 'python3-dateutil'], ['1662130757', 'python3-pip-wheel'], ['1662130953', 'timedatex']]
"""
_input = _input.strip(' \t\n\r').split('\n')
lines = []
if skip_header:
del _input[0]
for row in _input:
lines.append(row.split())
if sort_key != -1:
lines = sorted(lines, key=operator.itemgetter(sort_key))
return lines
def pluralize(noun, value, suffix='s'):
"""Returns a plural suffix if the value is not 1. By default, 's' is used as
the suffix.
From https://kite.com/python/docs/django.template.defaultfilters.pluralize
>>> pluralize('vote', 0)
'votes'
>>> pluralize('vote', 1)
'vote'
>>> pluralize('vote', 2)
'votes'
If an argument is provided, that string is used instead:
>>> pluralize('class', 0, 'es')
'classes'
>>> pluralize('class', 1, 'es')
'class'
>>> pluralize('class', 2, 'es')
'classes'
If the provided argument contains a comma, the text before the comma is used
for the singular case and the text after the comma is used for the plural
case:
>>> pluralize('cand', 0, 'y,ies)
'candies'
>>> pluralize('cand', 1, 'y,ies)
'candy'
>>> pluralize('cand', 2, 'y,ies)
'candies'
>>> pluralize('', 1, 'is,are')
'is'
>>> pluralize('', 2, 'is,are')
'are'
"""
if ',' in suffix:
singular, plural = suffix.split(',')
else:
singular, plural = '', suffix
if int(value) == 1:
return noun + singular
return noun + plural
# from /usr/lib/python3.10/site-packages/ansible/module_utils/_text.py
def to_bytes(obj, encoding='utf-8', errors=None, nonstring='simplerepr'):
"""Make sure that a string is a byte string
:arg obj: An object to make sure is a byte string. In most cases this
will be either a text string or a byte string. However, with
``nonstring='simplerepr'``, this can be used as a traceback-free
version of ``str(obj)``.
:kwarg encoding: The encoding to use to transform from a text string to
a byte string. Defaults to using 'utf-8'.
:kwarg errors: The error handler to use if the text string is not
encodable using the specified encoding. Any valid `codecs error
handler <https://docs.python.org/2/library/codecs.html#codec-base-classes>`_
may be specified. There are three additional error strategies
specifically aimed at helping people to port code. The first two are:
:surrogate_or_strict: Will use ``surrogateescape`` if it is a valid
handler, otherwise it will use ``strict``
:surrogate_or_replace: Will use ``surrogateescape`` if it is a valid
handler, otherwise it will use ``replace``.
Because ``surrogateescape`` was added in Python3 this usually means that
Python3 will use ``surrogateescape`` and Python2 will use the fallback
error handler. Note that the code checks for ``surrogateescape`` when the
module is imported. If you have a backport of ``surrogateescape`` for
Python2, be sure to register the error handler prior to importing this
module.
The last error handler is:
:surrogate_then_replace: Will use ``surrogateescape`` if it is a valid
handler. If encoding with ``surrogateescape`` would traceback,
surrogates are first replaced with a replacement characters
and then the string is encoded using ``replace`` (which replaces
the rest of the nonencodable bytes). If ``surrogateescape`` is
not present it will simply use ``replace``. (Added in Ansible 2.3)
This strategy is designed to never traceback when it attempts
to encode a string.
The default until Ansible-2.2 was ``surrogate_or_replace``
From Ansible-2.3 onwards, the default is ``surrogate_then_replace``.
:kwarg nonstring: The strategy to use if a nonstring is specified in
``obj``. Default is 'simplerepr'. Valid values are:
:simplerepr: The default. This takes the ``str`` of the object and
then returns the bytes version of that string.
:empty: Return an empty byte string
:passthru: Return the object passed in
:strict: Raise a :exc:`TypeError`
:returns: Typically this returns a byte string. If a nonstring object is
passed in this may be a different type depending on the strategy
specified by nonstring. This will never return a text string.
.. note:: If passed a byte string, this function does not check that the
string is valid in the specified encoding. If it's important that the
byte string is in the specified encoding do::
encoded_string = to_bytes(to_text(input_string, 'latin-1'), 'utf-8')
.. version_changed:: 2.3
Added the ``surrogate_then_replace`` error handler and made it the default error handler.
"""
if isinstance(obj, binary_type):
return obj
# We're given a text string
# If it has surrogates, we know because it will decode
original_errors = errors
if errors in _COMPOSED_ERROR_HANDLERS:
if HAS_SURROGATEESCAPE:
errors = 'surrogateescape'
elif errors == 'surrogate_or_strict':
errors = 'strict'
else:
errors = 'replace'
if isinstance(obj, text_type):
try:
# Try this first as it's the fastest
return obj.encode(encoding, errors)
except UnicodeEncodeError:
if original_errors in (None, 'surrogate_then_replace'):
# We should only reach this if encoding was non-utf8 original_errors was
# surrogate_then_escape and errors was surrogateescape
# Slow but works
return_string = obj.encode('utf-8', 'surrogateescape')
return_string = return_string.decode('utf-8', 'replace')
return return_string.encode(encoding, 'replace')
raise
# Note: We do these last even though we have to call to_bytes again on the
# value because we're optimizing the common case
if nonstring == 'simplerepr':
try:
value = str(obj)
except UnicodeError:
try:
value = repr(obj)
except UnicodeError:
# Giving up
return to_bytes('')
elif nonstring == 'passthru':
return obj
elif nonstring == 'empty':
# python2.4 doesn't have b''
return to_bytes('')
elif nonstring == 'strict':
raise TypeError('obj must be a string type')
else:
raise TypeError('Invalid value %s for to_bytes\' nonstring parameter' % nonstring)
return to_bytes(value, encoding, errors)
# from /usr/lib/python3.10/site-packages/ansible/module_utils/_text.py
def to_text(obj, encoding='utf-8', errors=None, nonstring='simplerepr'):
"""Make sure that a string is a text string
:arg obj: An object to make sure is a text string. In most cases this
will be either a text string or a byte string. However, with
``nonstring='simplerepr'``, this can be used as a traceback-free
version of ``str(obj)``.
:kwarg encoding: The encoding to use to transform from a byte string to
a text string. Defaults to using 'utf-8'.
:kwarg errors: The error handler to use if the byte string is not
decodable using the specified encoding. Any valid `codecs error
handler <https://docs.python.org/2/library/codecs.html#codec-base-classes>`_
may be specified. We support three additional error strategies
specifically aimed at helping people to port code:
:surrogate_or_strict: Will use surrogateescape if it is a valid
handler, otherwise it will use strict
:surrogate_or_replace: Will use surrogateescape if it is a valid
handler, otherwise it will use replace.
:surrogate_then_replace: Does the same as surrogate_or_replace but
`was added for symmetry with the error handlers in
:func:`ansible.module_utils._text.to_bytes` (Added in Ansible 2.3)
Because surrogateescape was added in Python3 this usually means that
Python3 will use `surrogateescape` and Python2 will use the fallback
error handler. Note that the code checks for surrogateescape when the
module is imported. If you have a backport of `surrogateescape` for
python2, be sure to register the error handler prior to importing this
module.
The default until Ansible-2.2 was `surrogate_or_replace`
In Ansible-2.3 this defaults to `surrogate_then_replace` for symmetry
with :func:`ansible.module_utils._text.to_bytes` .
:kwarg nonstring: The strategy to use if a nonstring is specified in
``obj``. Default is 'simplerepr'. Valid values are:
:simplerepr: The default. This takes the ``str`` of the object and
then returns the text version of that string.
:empty: Return an empty text string
:passthru: Return the object passed in
:strict: Raise a :exc:`TypeError`
:returns: Typically this returns a text string. If a nonstring object is
passed in this may be a different type depending on the strategy
specified by nonstring. This will never return a byte string.
From Ansible-2.3 onwards, the default is `surrogate_then_replace`.
.. version_changed:: 2.3
Added the surrogate_then_replace error handler and made it the default error handler.
"""
if isinstance(obj, text_type):
return obj
if errors in _COMPOSED_ERROR_HANDLERS:
if HAS_SURROGATEESCAPE:
errors = 'surrogateescape'
elif errors == 'surrogate_or_strict':
errors = 'strict'
else:
errors = 'replace'
if isinstance(obj, binary_type):
# Note: We don't need special handling for surrogate_then_replace
# because all bytes will either be made into surrogates or are valid
# to decode.
return obj.decode(encoding, errors)
# Note: We do these last even though we have to call to_text again on the
# value because we're optimizing the common case
if nonstring == 'simplerepr':
try:
value = str(obj)
except UnicodeError:
try:
value = repr(obj)
except UnicodeError:
# Giving up
return ''
elif nonstring == 'passthru':
return obj
elif nonstring == 'empty':
return ''
elif nonstring == 'strict':
raise TypeError('obj must be a string type')
else:
raise TypeError('Invalid value %s for to_text\'s nonstring parameter' % nonstring)
return to_text(value, encoding, errors)
def uniq(string):
"""Removes duplicate words from a string (only the second duplicates).
The sequence of the words will not be changed.
>>> uniq('This is a test. This is a second test. And this is a third test.')
'This is a test. second And this third'
"""
words = string.split()
return ' '.join(sorted(set(words), key=words.index))
to_native = to_text
# PY2: to_native = to_bytes