-
Notifications
You must be signed in to change notification settings - Fork 15
/
base.py
354 lines (307 loc) · 11.9 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
# -*- coding: utf-8 -*-
from bda.cache import ICacheManager
from bda.cache.interfaces import INullCacheProvider
from node.ext.ldap.cache import nullcacheProviderFactory
from node.ext.ldap.interfaces import ICacheProviderFactory
from node.ext.ldap.properties import LDAPProps
from zope.component import queryUtility
import hashlib
import ldap
import logging
import six
logger = logging.getLogger('node.ext.ldap')
def testLDAPConnectivity(server=None, port=None, props=None):
"""Function to test the availability of the LDAP Server.
:param server: Server IP or name
:param port: LDAP port
:param props: LDAPProps object. If given, server and port are ignored.
:return object: Either string 'success' if connectivity, otherwise ldap
error instance.
"""
if props is None:
props = LDAPProps(server=server, port=port)
try:
c = LDAPConnector(props=props)
lc = LDAPCommunicator(c)
lc.bind()
lc.unbind()
return 'success'
except ldap.LDAPError as error:
return error
def md5digest(key):
"""Abbrev to create a md5 hex digest.
:param key: Key to create a md5 hex digest for.
:return digest: hex digest.
"""
try:
m = hashlib.md5()
except ValueError:
m = hashlib.md5(usedforsecurity=False)
m.update(ensure_bytes(key))
return m.hexdigest()
def cache_key(parts):
def dec(p):
if isinstance(p, bytes):
p = p.decode('utf-8', 'replace')
elif isinstance(p, (list, tuple)):
p = u'-'.join([dec(_) for _ in p])
elif not isinstance(p, six.text_type):
p = six.text_type(p)
return p
return u'-'.join([dec(p) for p in parts])
def ensure_text(value):
if value and not isinstance(value, six.text_type):
value = value.decode('utf-8')
return value
def ensure_bytes(value):
if value and isinstance(value, six.text_type):
value = value.encode('utf-8')
return value
def ensure_bytes_py2(value):
if six.PY2 and value and isinstance(value, six.text_type): # pragma: no cover
value = value.encode('utf-8')
return value
class LDAPConnector(object):
"""Object is responsible for the LDAP connection.
This Object knows about the LDAP Server to connect to, the authentication
information and the protocol to use.
TODO: tests for TLS/SSL Support - it should be functional.
(see also properties.py)
"""
def __init__(self, props=None):
"""Initialize LDAP connector.
:param props: ``LDAPServerProperties`` instance.
"""
self.protocol = ldap.VERSION3
self._uri = props.uri
self._bindDN = props.user
self._bindPW = props.password
self._cache = props.cache
self._cachetimeout = props.timeout
self._start_tls = props.start_tls
self._ignore_cert = props.ignore_cert
self._tls_cacert_file = props.tls_cacertfile
self._tls_cacert_dir = props.tls_cacertdir
self._tls_clcert_file = props.tls_clcertfile
self._tls_clkey_file = props.tls_clkeyfile
self._retry_max = props.retry_max
self._retry_delay = props.retry_delay
# backward compatibility:
# missing timeout properties in LDAPProps' pas.plugins.ldap <= 1.8.1
self._conn_timeout = getattr(props, "conn_timeout", -1)
self._op_timeout = getattr(props, "op_timeout", -1)
self._con = None
def bind(self):
"""Bind to Server and return the Connection Object.
"""
if self._ignore_cert: # pragma: no cover
ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_NEVER)
elif self._tls_cacert_file: # pragma: no cover
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, self._tls_cacert_file)
elif self._tls_cacert_dir: # pragma: no cover
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, self._tls_cacert_dir)
if self._tls_clcert_file and self._tls_clkey_file: # pragma: no cover
ldap.set_option(ldap.OPT_X_TLS_CERTFILE, self._tls_clcert_file)
ldap.set_option(ldap.OPT_X_TLS_KEYFILE, self._tls_clkey_file)
elif self._tls_clcert_file or self._tls_clkey_file: # pragma: no cover
logger.exception("Only client certificate or key have been provided.")
self._con = ldap.ldapobject.ReconnectLDAPObject(
self._uri,
bytes_mode=False,
bytes_strictness='silent',
retry_max=self._retry_max,
retry_delay=self._retry_delay
)
# Turning referrals off since they cause problems with MS Active
# Directory More info: https://www.python-ldap.org/faq.html#usage
self._con.set_option(ldap.OPT_REFERRALS, 0)
self._con.protocol_version = self.protocol
# Set the connection timeout
if self._conn_timeout > 0:
self._con.set_option(ldap.OPT_NETWORK_TIMEOUT, self._conn_timeout)
# Set the operations timeout
if self._op_timeout > 0:
self._con.timeout = self._op_timeout
if self._start_tls: # pragma: no cover
# ignore in tests for now. nevertheless provide a test environment
# for TLS and SSL later
self._con.start_tls_s()
self._con.simple_bind_s(self._bindDN, self._bindPW)
return self._con
def unbind(self):
"""Unbind from Server.
"""
if self._con is None:
return
try:
self._con.unbind_s()
except AttributeError:
logger.exception("Unbind not successful.")
self._con = None
def __del__(self):
self.unbind()
class LDAPCommunicator(object):
"""Class LDAPCommunicator is responsible for the communication with the
LDAP Server.
It provides methods to search, add, modify and delete entries in the
directory.
"""
def __init__(self, connector):
"""Initialize LDAP communicator.
:param connector: ``LDAPConnector`` instance.
"""
self.baseDN = ''
self._connector = connector
self._con = None
self._cache = None
if connector._cache:
cachefactory = queryUtility(ICacheProviderFactory)
if cachefactory is None:
cachefactory = nullcacheProviderFactory
cacheprovider = cachefactory()
self._cache = ICacheManager(cacheprovider)
self._cache.setTimeout(connector._cachetimeout)
if not INullCacheProvider.providedBy(self._cache):
logger.debug(
u"LDAP Caching activated for instance '{0:s}'. "
u"Use '{1:s}' as cache provider".format(
repr(self._cache),
repr(cacheprovider)
)
)
else: # pragma: no cover
logger.debug(
u"LDAP Caching activated for instance '{0:s}'.".format(
repr(self._cache),
)
)
def bind(self):
"""Bind to LDAP Server.
"""
self._con = self._connector.bind()
def unbind(self):
"""Unbind from LDAP Server.
"""
self._connector.unbind()
self._con = None
def ensure_connection(self):
"""If LDAP directory is down, bind again and retry given function.
"""
if self._con is None:
self.bind()
def search(self, queryFilter, scope, baseDN=None,
force_reload=False, attrlist=None, attrsonly=0,
page_size=None, cookie=None):
"""Search the directory.
:param queryFilter: LDAP query filter
:param scope: LDAP search scope
:param baseDN: Search base. Defaults to ``self.baseDN``
:param force_reload: Force reload of result if cache enabled.
:param attrlist: LDAP attrlist to query.
:param attrsonly: Flag whether to return only attribute names, without
corresponding values.
:param page_size: Number of items per page, when doing pagination.
:param cookie: Cookie string returned by previous search with
pagination.
"""
if baseDN is None:
baseDN = self.baseDN
if not baseDN:
raise ValueError(u"baseDN unset.")
if page_size:
if cookie is None:
cookie = ''
pagedresults = ldap.controls.libldap.SimplePagedResultsControl(
criticality=True, size=page_size, cookie=cookie)
serverctrls = [pagedresults]
else:
if cookie:
raise ValueError('cookie passed without page_size')
serverctrls = []
def _search(baseDN, scope, queryFilter,
attrlist, attrsonly, serverctrls):
# we have to do async search to also retrieve server controls
# in case we do pagination of results
self.ensure_connection()
if type(attrlist) in (list, tuple):
attrlist = [str(_) for _ in attrlist]
try:
msgid = self._con.search_ext(
baseDN,
scope,
queryFilter,
attrlist,
attrsonly,
serverctrls=serverctrls
)
except ldap.LDAPError as e:
logger.warn(str(e))
return []
rtype, results, rmsgid, rctrls = self._con.result3(msgid)
ctype = ldap.controls.libldap.SimplePagedResultsControl.controlType
pctrls = [c for c in rctrls if c.controlType == ctype]
if pctrls:
return results, pctrls[0].cookie
else:
return results
args = [baseDN, scope, queryFilter, attrlist, attrsonly, serverctrls]
if self._cache:
key_items = [
self._connector._bindDN,
baseDN,
sorted(attrlist or []),
attrsonly,
queryFilter,
scope,
page_size,
cookie
]
return self._cache.getData(
_search,
md5digest(cache_key(key_items)),
force_reload,
args
)
return _search(*args)
def add(self, dn, data):
"""Insert an entry into directory.
:param dn: Adding DN
:param data: Dict containing key/value pairs of entry attributes
"""
attributes = [(k, v) for k, v in data.items()]
self.ensure_connection()
self._con.add_s(dn, attributes)
def modify(self, dn, modlist):
"""Modify an existing entry in the directory.
Takes the DN of the entry and the modlist, which is a list of tuples
containing modifation descriptions. The first element gives the type
of the modification (MOD_REPLACE, MOD_DELETE, or MOD_ADD), the second
gives the name of the field to modify, and the third gives the new
value for the field (for MOD_ADD and MOD_REPLACE).
"""
self.ensure_connection()
self._con.modify_s(dn, modlist)
def delete(self, deleteDN):
"""Delete an entry from the directory.
Take the DN to delete from the directory as argument.
"""
self.ensure_connection()
self._con.delete_s(deleteDN)
def passwd(self, userdn, oldpw, newpw):
self.ensure_connection()
self._con.passwd_s(userdn, oldpw, newpw)
def main():
"""Use this module from command line for testing the connectivity to the
LDAP Server.
Expects server and port as arguments.
"""
import sys
if len(sys.argv) < 3:
res = 'usage: python base.py [server] [port]'
else:
server, port = sys.argv[1:]
res = testLDAPConnectivity(server, int(port))
print(res)
return res
if __name__ == "__main__": # pragma: no cover
main()