From b5c21e567cc6a1834bdf2b8356ebb81b2ba5559f Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Thu, 1 Jun 2023 16:15:28 +1000 Subject: [PATCH 1/4] refactor: Replace use of mox with mock in httpsource_test. --- nss_cache/sources/httpsource_test.py | 57 ++++++++-------------------- 1 file changed, 15 insertions(+), 42 deletions(-) diff --git a/nss_cache/sources/httpsource_test.py b/nss_cache/sources/httpsource_test.py index 5b045aa..1ec2f76 100644 --- a/nss_cache/sources/httpsource_test.py +++ b/nss_cache/sources/httpsource_test.py @@ -21,9 +21,7 @@ import time import unittest import pycurl -from mox3 import mox from unittest import mock -from io import BytesIO from nss_cache import error from nss_cache.maps import automount @@ -224,10 +222,13 @@ def testCreateMap(self): self.assertTrue(isinstance(self.updater.CreateMap(), passwd.PasswdMap)) -class TestShadowUpdateGetter(mox.MoxTestBase): +class TestShadowUpdateGetter(unittest.TestCase): def setUp(self): super(TestShadowUpdateGetter, self).setUp() self.updater = httpsource.ShadowUpdateGetter() + curl_patcher = mock.patch.object(curl, "CurlFetch") + self.addCleanup(curl_patcher.stop) + self.curl_fetch_mock = curl_patcher.start() def testGetParser(self): parser = self.updater.GetParser() @@ -239,28 +240,14 @@ def testCreateMap(self): self.assertTrue(isinstance(self.updater.CreateMap(), shadow.ShadowMap)) def testShadowGetUpdatesWithContent(self): - mock_conn = self.mox.CreateMockAnything() - mock_conn.setopt(mox.IgnoreArg(), mox.IgnoreArg()).MultipleTimes() - mock_conn.getinfo(pycurl.INFO_FILETIME).AndReturn(-1) - - self.mox.StubOutWithMock(pycurl, "Curl") - pycurl.Curl().AndReturn(mock_conn) - - self.mox.StubOutWithMock(curl, "CurlFetch") - - curl.CurlFetch("https://TEST_URL", mock_conn, self.updater.log).AndReturn( - [ - 200, - "", - BytesIO( - b"""usera:x::::::: + self.curl_fetch_mock.return_value = ( + 200, + "", + b"""usera:x::::::: userb:x::::::: -""" - ).getvalue(), - ] +""", ) - self.mox.ReplayAll() config = {} source = httpsource.HttpFilesSource(config) result = self.updater.GetUpdates(source, "https://TEST_URL", 1) @@ -268,28 +255,14 @@ def testShadowGetUpdatesWithContent(self): self.assertEqual(len(result), 2) def testShadowGetUpdatesWithBz2Content(self): - mock_conn = self.mox.CreateMockAnything() - mock_conn.setopt(mox.IgnoreArg(), mox.IgnoreArg()).MultipleTimes() - mock_conn.getinfo(pycurl.INFO_FILETIME).AndReturn(-1) - - self.mox.StubOutWithMock(pycurl, "Curl") - pycurl.Curl().AndReturn(mock_conn) - - self.mox.StubOutWithMock(curl, "CurlFetch") - - curl.CurlFetch("https://TEST_URL", mock_conn, self.updater.log).AndReturn( - [ - 200, - "", - BytesIO( - base64.b64decode( - "QlpoOTFBWSZTWfm+rXYAAAvJgAgQABAyABpAIAAhKm1GMoQAwRSpHIXejGQgz4u5IpwoSHzfVrsA" - ) - ).getvalue(), - ] + self.curl_fetch_mock.return_value = ( + 200, + "", + base64.b64decode( + "QlpoOTFBWSZTWfm+rXYAAAvJgAgQABAyABpAIAAhKm1GMoQAwRSpHIXejGQgz4u5IpwoSHzfVrsA" + ), ) - self.mox.ReplayAll() config = {} source = httpsource.HttpFilesSource(config) result = self.updater.GetUpdates(source, "https://TEST_URL", 1) From 4f9d297879cdad033a365816cf82a715d985c763 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Sat, 3 Jun 2023 09:56:46 +1000 Subject: [PATCH 2/4] refactor: Replace use of mox with mock in ldapsource_test. --- nss_cache/sources/ldapsource_test.py | 1136 +++++++++----------------- 1 file changed, 389 insertions(+), 747 deletions(-) diff --git a/nss_cache/sources/ldapsource_test.py b/nss_cache/sources/ldapsource_test.py index 376cc68..db35a19 100644 --- a/nss_cache/sources/ldapsource_test.py +++ b/nss_cache/sources/ldapsource_test.py @@ -23,7 +23,7 @@ import time import unittest import ldap -from mox3 import mox +from unittest import mock from nss_cache import error from nss_cache.maps import automount @@ -37,7 +37,7 @@ TEST_URI = "TEST_URI" -class TestLdapSource(mox.MoxTestBase): +class TestLdapSource(unittest.TestCase): def setUp(self): """Initialize a basic config dict.""" super(TestLdapSource, self).setUp() @@ -54,30 +54,28 @@ def setUp(self): "tls_cacertdir": "TEST_TLS_CACERTDIR", "tls_cacertfile": "TEST_TLS_CACERTFILE", } + ldap_patcher = mock.patch.object(ldap, "ldapobject", autospec=True) + self.addCleanup(ldap_patcher.stop) + self.ldap_mock = ldap_patcher.start() - def compareSPRC(self, expected_value=""): - def comparator(param): - if not isinstance(param, list): + class compareSPRC: + def __init__(self, expected_value=""): + self.expected_value = expected_value + + def __eq__(self, other): + if not isinstance(other, list): return False - sprc = param[0] + sprc = other[0] if not isinstance(sprc, ldap.controls.SimplePagedResultsControl): return False cookie = ldapsource.getCookieFromControl(sprc) - return cookie == expected_value - - return comparator + return cookie == self.expected_value def testDefaultConfiguration(self): config = {"uri": "ldap://foo"} - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="", who="") - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="ldap://foo", retry_max=3, retry_delay=5 - ).AndReturn(mock_rlo) - self.mox.ReplayAll() + source = ldapsource.LdapSource(config) self.assertEqual(source.conf["bind_dn"], ldapsource.LdapSource.BIND_DN) @@ -93,13 +91,7 @@ def testDefaultConfiguration(self): def testOverrideDefaultConfiguration(self): config = dict(self.config) config["scope"] = ldap.SCOPE_BASE - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY, uri="TEST_URI" - ).AndReturn(mock_rlo) - self.mox.ReplayAll() + source = ldapsource.LdapSource(config) self.assertEqual(source.conf["scope"], ldap.SCOPE_BASE) @@ -115,16 +107,12 @@ def testOverrideDefaultConfiguration(self): def testDebugLevelSet(self): config = dict(self.config) config["ldap_debug"] = 3 - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.set_option(ldap.OPT_DEBUG_LEVEL, 3) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY, uri="TEST_URI" - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() - source = ldapsource.LdapSource(config) + + ldapsource.LdapSource(config) + + self.ldap_mock.ReconnectLDAPObject.return_value.set_option.assert_called_with( + ldap.OPT_DEBUG_LEVEL, 3 + ) def testTrapServerDownAndRetry(self): config = dict(self.config) @@ -132,52 +120,24 @@ def testTrapServerDownAndRetry(self): config["bind_password"] = "" config["retry_delay"] = 5 config["retry_max"] = 3 - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="", who="").MultipleTimes().AndRaise( + self.ldap_mock.ReconnectLDAPObject.return_value.simple_bind_s.side_effect = ( ldap.SERVER_DOWN ) - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=3, retry_delay=5 - ).MultipleTimes().AndReturn(mock_rlo) + with mock.patch("time.sleep"): + self.assertRaises(error.SourceUnavailable, ldapsource.LdapSource, config) - self.mox.StubOutWithMock(time, "sleep") - time.sleep(5) - time.sleep(5) - - self.mox.ReplayAll() - - self.assertRaises(error.SourceUnavailable, ldapsource.LdapSource, config) + self.ldap_mock.ReconnectLDAPObject.assert_called_with( + mock.ANY, retry_max=3, retry_delay=5 + ) def testIterationOverLdapDataSource(self): config = dict(self.config) - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base=config["base"], - filterstr="TEST_FILTER", - scope="TEST_SCOPE", - attrlist="TEST_ATTRLIST", - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - dataset = [("dn", {"uid": [0]})] - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, dataset, None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, dataset, None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) source.Search( @@ -191,39 +151,16 @@ def testIterationOverLdapDataSource(self): for r in source: self.assertEqual(dataset[0][1], r) count += 1 - self.assertEqual(1, count) - def testIterationTimeout(self): + @mock.patch("time.sleep") + def testIterationTimeout(self, unused_time_mock): config = dict(self.config) config["retry_delay"] = 5 config["retry_max"] = 3 - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base=config["base"], - filterstr="TEST_FILTER", - scope="TEST_SCOPE", - attrlist="TEST_ATTRLIST", - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - dataset = [("dn", {"uid": [0]})] - mock_rlo.result3( - "TEST_RES", all=0, timeout="TEST_TIMELIMIT" - ).MultipleTimes().AndRaise(ldap.TIMELIMIT_EXCEEDED) - - self.mox.StubOutWithMock(time, "sleep") - time.sleep(5) - time.sleep(5) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=3, retry_delay=5 - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = ( + ldap.TIMELIMIT_EXCEEDED + ) source = ldapsource.LdapSource(config) source.Search( @@ -236,7 +173,6 @@ def testIterationTimeout(self): count = 0 for r in source: count += 1 - self.assertEqual(0, count) def testGetPasswdMap(self): @@ -254,7 +190,6 @@ def testGetPasswdMap(self): "modifyTimestamp": ["20070227012807Z"], }, ) - config = dict(self.config) attrlist = [ "uid", @@ -267,38 +202,16 @@ def testGetPasswdMap(self): "loginShell", "modifyTimestamp", ] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetPasswdMap() self.assertEqual(1, len(data)) - first = data.PopItem() - self.assertEqual("test", first.name) def testGetPasswdMapWithUidAttr(self): @@ -317,7 +230,6 @@ def testGetPasswdMapWithUidAttr(self): "modifyTimestamp": ["20070227012807Z"], }, ) - config = dict(self.config) config["uidattr"] = "name" attrlist = [ @@ -327,44 +239,29 @@ def testGetPasswdMapWithUidAttr(self): "gecos", "cn", "homeDirectory", + "loginShell", "fullName", "name", - "loginShell", "modifyTimestamp", ] - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() - + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetPasswdMap() self.assertEqual(1, len(data)) - first = data.PopItem() - self.assertEqual("test", first.name) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetPasswdMapWithShellOverride(self): test_posix_account = ( @@ -381,7 +278,6 @@ def testGetPasswdMapWithShellOverride(self): "modifyTimestamp": ["20070227012807Z"], }, ) - config = dict(self.config) config["override_shell"] = "/bin/false" attrlist = [ @@ -391,43 +287,28 @@ def testGetPasswdMapWithShellOverride(self): "gecos", "cn", "homeDirectory", - "fullName", "loginShell", + "fullName", "modifyTimestamp", ] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetPasswdMap() self.assertEqual(1, len(data)) - first = data.PopItem() - self.assertEqual("/bin/false", first.shell) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetPasswdMapWithUseRid(self): test_posix_account = ( @@ -444,7 +325,6 @@ def testGetPasswdMapWithUseRid(self): "modifyTimestamp": ["20070227012807Z"], }, ) - config = dict(self.config) config["use_rid"] = "1" attrlist = [ @@ -454,44 +334,29 @@ def testGetPasswdMapWithUseRid(self): "gecos", "cn", "homeDirectory", + "loginShell", "fullName", "sambaSID", - "loginShell", "modifyTimestamp", ] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetPasswdMap() self.assertEqual(1, len(data)) - first = data.PopItem() - self.assertEqual("test", first.name) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetPasswdMapAD(self): test_posix_account = ( @@ -508,53 +373,37 @@ def testGetPasswdMapAD(self): "whenChanged": ["20070227012807.0Z"], }, ) - config = dict(self.config) config["ad"] = "1" attrlist = [ "sAMAccountName", - "pwdLastSet", - "loginShell", "objectSid", "displayName", - "whenChanged", "unixHomeDirectory", + "pwdLastSet", + "loginShell", + "whenChanged", + ] + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), ] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() source = ldapsource.LdapSource(config) data = source.GetPasswdMap() self.assertEqual(1, len(data)) - first = data.PopItem() - self.assertEqual("test", first.name) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) - def testGetPasswdMapADWithOffeset(self): + def testGetPasswdMapADWithOffset(self): test_posix_account = ( "cn=test,ou=People,dc=example,dc=com", { @@ -575,46 +424,31 @@ def testGetPasswdMapADWithOffeset(self): config["offset"] = 10000 attrlist = [ "sAMAccountName", - "pwdLastSet", - "loginShell", "objectSid", "displayName", - "whenChanged", "unixHomeDirectory", + "pwdLastSet", + "loginShell", + "whenChanged", + ] + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), ] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() source = ldapsource.LdapSource(config) data = source.GetPasswdMap() self.assertEqual(1, len(data)) - first = data.PopItem() - self.assertEqual("test", first.name) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetGroupMap(self): test_posix_group = ( @@ -627,42 +461,26 @@ def testGetGroupMap(self): "modifyTimestamp": ["20070227012807Z"], }, ) - config = dict(self.config) - attrlist = ["cn", "uid", "gidNumber", "memberUid", "modifyTimestamp"] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + attrlist = ["cn", "gidNumber", "memberUid", "uid", "modifyTimestamp"] + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetGroupMap() self.assertEqual(1, len(data)) - ent = data.PopItem() - self.assertEqual("testgroup", ent.name) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetGroupMapWithUseRid(self): test_posix_group = ( @@ -675,50 +493,34 @@ def testGetGroupMapWithUseRid(self): "modifyTimestamp": ["20070227012807Z"], }, ) - config = dict(self.config) config["use_rid"] = "1" attrlist = [ "cn", - "uid", "gidNumber", "memberUid", + "uid", "sambaSID", "modifyTimestamp", ] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetGroupMap() self.assertEqual(1, len(data)) - ent = data.PopItem() - self.assertEqual("testgroup", ent.name) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetGroupMapAsUser(self): test_posix_group = ( @@ -735,50 +537,34 @@ def testGetGroupMapAsUser(self): "modifyTimestamp": ["20070227012807Z"], }, ) - config = dict(self.config) config["use_rid"] = "1" attrlist = [ "cn", - "uid", "gidNumber", "memberUid", + "uid", "sambaSID", "modifyTimestamp", ] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetGroupMap() self.assertEqual(1, len(data)) - ent = data.PopItem() - self.assertEqual("test", ent.name) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetGroupMapAD(self): test_posix_group = ( @@ -797,43 +583,32 @@ def testGetGroupMapAD(self): "whenChanged": ["20070227012807.0Z"], }, ) - config = dict(self.config) config["ad"] = "1" - attrlist = ["sAMAccountName", "objectSid", "member", "whenChanged"] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + attrlist = [ + "sAMAccountName", + "member", + "objectSid", + "whenChanged", + ] + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetGroupMap() self.assertEqual(1, len(data)) - ent = data.PopItem() - self.assertEqual("testgroup", ent.name) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetGroupMapBis(self): test_posix_group = ( @@ -850,44 +625,34 @@ def testGetGroupMapBis(self): "modifyTimestamp": ["20070227012807Z"], }, ) - config = dict(self.config) config["rfc2307bis"] = 1 - attrlist = ["cn", "uid", "gidNumber", "member", "modifyTimestamp"] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + attrlist = [ + "cn", + "gidNumber", + "member", + "uid", + "modifyTimestamp", + ] + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetGroupMap() self.assertEqual(1, len(data)) - ent = data.PopItem() - self.assertEqual("testgroup", ent.name) self.assertEqual(3, len(ent.members)) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetGroupNestedNotConfigured(self): test_posix_group = ( @@ -919,34 +684,20 @@ def testGetGroupNestedNotConfigured(self): "modifyTimestamp": ["20070227012807Z"], }, ) - config = dict(self.config) config["rfc2307bis"] = 1 - attrlist = ["cn", "uid", "gidNumber", "member", "modifyTimestamp"] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_group, test_child_group], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) + attrlist = [ + "cn", + "gidNumber", + "member", + "uid", + "modifyTimestamp", + ] + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_group, test_child_group], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] - self.mox.ReplayAll() source = ldapsource.LdapSource(config) data = source.GetGroupMap() @@ -957,6 +708,13 @@ def testGetGroupNestedNotConfigured(self): self.assertEqual(len(datadict["testgroup"].members), 4) self.assertEqual(len(datadict["child"].members), 3) self.assertNotIn("newperson", datadict["testgroup"].members) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetGroupNested(self): test_posix_group = ( @@ -988,36 +746,23 @@ def testGetGroupNested(self): "modifyTimestamp": ["20070227012807Z"], }, ) - config = dict(self.config) config["rfc2307bis"] = 1 config["nested_groups"] = 1 config["use_rid"] = 1 - attrlist = ["cn", "uid", "gidNumber", "member", "sambaSID", "modifyTimestamp"] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_group, test_child_group], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) + attrlist = [ + "cn", + "gidNumber", + "member", + "uid", + "sambaSID", + "modifyTimestamp", + ] + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_group, test_child_group], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] - self.mox.ReplayAll() source = ldapsource.LdapSource(config) data = source.GetGroupMap() @@ -1028,6 +773,13 @@ def testGetGroupNested(self): self.assertEqual(len(datadict["testgroup"].members), 7) self.assertEqual(len(datadict["child"].members), 3) self.assertIn("newperson", datadict["testgroup"].members) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetGroupLoop(self): test_posix_group = ( @@ -1072,41 +824,32 @@ def testGetGroupLoop(self): "modifyTimestamp": ["20070227012807Z"], }, ) - config = dict(self.config) config["rfc2307bis"] = 1 config["nested_groups"] = 1 config["use_rid"] = 1 - attrlist = ["cn", "uid", "gidNumber", "member", "sambaSID", "modifyTimestamp"] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( + attrlist = [ + "cn", + "gidNumber", + "member", + "uid", + "sambaSID", + "modifyTimestamp", + ] + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ ( ldap.RES_SEARCH_ENTRY, - [test_posix_group, test_child_group, test_loop_group], + [ + test_posix_group, + test_child_group, + test_loop_group, + ], None, [], - ) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) + ), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] - self.mox.ReplayAll() source = ldapsource.LdapSource(config) data = source.GetGroupMap() @@ -1117,6 +860,13 @@ def testGetGroupLoop(self): self.assertEqual(len(datadict["testgroup"].members), 7) self.assertEqual(len(datadict["child"].members), 3) self.assertIn("newperson", datadict["testgroup"].members) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetGroupMapBisAlt(self): test_posix_group = ( @@ -1157,53 +907,38 @@ def testGetGroupMapBisAlt(self): "modifyTimestamp", ] uidattr = ["uid"] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - mock_rlo.search_ext( - base=dn_user, - filterstr="(objectClass=*)", - scope=ldap.SCOPE_BASE, - attrlist=mox.SameElementsAs(uidattr), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_group], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + (ldap.RES_SEARCH_ENTRY, [test_posix_account], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetGroupMap() self.assertEqual(1, len(data)) - ent = data.PopItem() - self.assertEqual("testgroup", ent.name) self.assertEqual(1, len(ent.members)) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_has_calls( + [ + mock.call( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ), + mock.call( + base=dn_user, + filterstr="(objectClass=*)", + scope=mock.ANY, + attrlist=uidattr, + serverctrls=self.compareSPRC(), + ), + ] + ) def testGetShadowMap(self): test_shadow = ( @@ -1233,40 +968,25 @@ def testGetShadowMap(self): "userPassword", "modifyTimestamp", ] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_shadow], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_shadow], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetShadowMap() self.assertEqual(1, len(data)) - ent = data.PopItem() - self.assertEqual("test", ent.name) self.assertEqual("p4ssw0rd", ent.passwd) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetShadowMapWithUidAttr(self): test_shadow = ( @@ -1291,48 +1011,33 @@ def testGetShadowMapWithUidAttr(self): "shadowLastChange", "shadowMin", "shadowMax", - "name", "shadowWarning", "shadowInactive", "shadowExpire", "shadowFlag", "userPassword", + "name", "modifyTimestamp", ] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_shadow], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_shadow], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetShadowMap() self.assertEqual(1, len(data)) - ent = data.PopItem() - self.assertEqual("test", ent.name) self.assertEqual("p4ssw0rd", ent.passwd) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetNetgroupMap(self): test_posix_netgroup = ( @@ -1345,41 +1050,31 @@ def testGetNetgroupMap(self): }, ) config = dict(self.config) - attrlist = ["cn", "memberNisNetgroup", "nisNetgroupTriple", "modifyTimestamp"] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_netgroup], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + attrlist = [ + "cn", + "memberNisNetgroup", + "nisNetgroupTriple", + "modifyTimestamp", + ] + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_netgroup], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetNetgroupMap() self.assertEqual(1, len(data)) - ent = data.PopItem() - self.assertEqual("test", ent.name) self.assertEqual("(-,hax0r,) admins", ent.entries) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetNetgroupMapWithDupes(self): test_posix_netgroup = ( @@ -1392,41 +1087,31 @@ def testGetNetgroupMapWithDupes(self): }, ) config = dict(self.config) - attrlist = ["cn", "memberNisNetgroup", "nisNetgroupTriple", "modifyTimestamp"] - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr="TEST_FILTER", - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_posix_netgroup], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + attrlist = [ + "cn", + "memberNisNetgroup", + "nisNetgroupTriple", + "modifyTimestamp", + ] + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_posix_netgroup], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetNetgroupMap() self.assertEqual(1, len(data)) - ent = data.PopItem() - self.assertEqual("test", ent.name) self.assertEqual("(-,hax0r,)", ent.entries) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=mock.ANY, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetAutomountMap(self): test_automount = ( @@ -1440,41 +1125,26 @@ def testGetAutomountMap(self): config = dict(self.config) attrlist = ["cn", "automountInformation", "modifyTimestamp"] filterstr = "(objectclass=automount)" - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr=filterstr, - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_automount], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_automount], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetAutomountMap(location="TEST_BASE") self.assertEqual(1, len(data)) - ent = data.PopItem() - self.assertEqual("user", ent.key) self.assertEqual("-tcp,rw", ent.options) self.assertEqual("home:/home/user", ent.location) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=filterstr, + scope=mock.ANY, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testGetAutomountMasterMap(self): test_master_ou = ( @@ -1492,63 +1162,45 @@ def testGetAutomountMasterMap(self): }, ) config = dict(self.config) - # first search for the dn of ou=auto.master - attrlist = ["dn"] - filterstr = "(&(objectclass=automountMap)(ou=auto.master))" scope = ldap.SCOPE_SUBTREE - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr=filterstr, - scope=scope, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_master_ou], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - # then search for the entries under ou=auto.master - attrlist = ["cn", "automountInformation", "modifyTimestamp"] - filterstr = "(objectclass=automount)" - scope = ldap.SCOPE_ONELEVEL - base = "ou=auto.master,ou=automounts,dc=example,dc=com" - - mock_rlo.search_ext( - base=base, - filterstr=filterstr, - scope=scope, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_ENTRY, [test_automount], None, []) - ) - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) - ) - - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - - self.mox.ReplayAll() + self.ldap_mock.ReconnectLDAPObject.return_value.result3.side_effect = [ + (ldap.RES_SEARCH_ENTRY, [test_master_ou], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + (ldap.RES_SEARCH_ENTRY, [test_automount], None, []), + (ldap.RES_SEARCH_RESULT, None, None, []), + ] source = ldapsource.LdapSource(config) data = source.GetAutomountMasterMap() self.assertEqual(1, len(data)) - ent = data.PopItem() self.assertEqual("/home", ent.key) self.assertEqual("ou=auto.home,ou=automounts,dc=example,dc=com", ent.location) self.assertEqual(None, ent.options) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_has_calls( + [ + # first search for the dn of ou=auto.master + mock.call( + base=mock.ANY, + filterstr="(&(objectclass=automountMap)(ou=auto.master))", + scope=ldap.SCOPE_SUBTREE, + attrlist=["dn"], + serverctrls=self.compareSPRC(), + ), + # then search for the entries under ou=auto.master + mock.call( + base="ou=auto.master,ou=automounts,dc=example,dc=com", + filterstr="(objectclass=automount)", + scope=ldap.SCOPE_ONELEVEL, + attrlist=[ + "cn", + "automountInformation", + "modifyTimestamp", + ], + serverctrls=self.compareSPRC(), + ), + ] + ) def testVerify(self): attrlist = [ @@ -1558,33 +1210,28 @@ def testVerify(self): "gecos", "cn", "homeDirectory", - "fullName", "loginShell", + "fullName", "modifyTimestamp", ] filterstr = "(&TEST_FILTER(modifyTimestamp>=19700101000001Z))" - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr=filterstr, - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) + self.ldap_mock.ReconnectLDAPObject.return_value.result3.return_value = ( + ldap.RES_SEARCH_RESULT, + None, + None, + [], ) - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - self.mox.ReplayAll() source = ldapsource.LdapSource(self.config) + self.assertEqual(0, source.Verify(0)) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=filterstr, + scope=ldap.SCOPE_ONELEVEL, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) def testVerifyRID(self): attrlist = [ @@ -1594,36 +1241,31 @@ def testVerifyRID(self): "gecos", "cn", "homeDirectory", - "fullName", "loginShell", - "modifyTimestamp", + "fullName", "sambaSID", + "modifyTimestamp", ] filterstr = "(&TEST_FILTER(modifyTimestamp>=19700101000001Z))" - - mock_rlo = self.mox.CreateMock(ldap.ldapobject.ReconnectLDAPObject) - mock_rlo.simple_bind_s(cred="TEST_BIND_PASSWORD", who="TEST_BIND_DN") - mock_rlo.search_ext( - base="TEST_BASE", - filterstr=filterstr, - scope=ldap.SCOPE_ONELEVEL, - attrlist=mox.SameElementsAs(attrlist), - serverctrls=mox.Func(self.compareSPRC()), - ).AndReturn("TEST_RES") - - mock_rlo.result3("TEST_RES", all=0, timeout="TEST_TIMELIMIT").AndReturn( - (ldap.RES_SEARCH_RESULT, None, None, []) + self.ldap_mock.ReconnectLDAPObject.return_value.result3.return_value = ( + ldap.RES_SEARCH_RESULT, + None, + None, + [], ) - self.mox.StubOutWithMock(ldap, "ldapobject") - ldap.ldapobject.ReconnectLDAPObject( - uri="TEST_URI", retry_max=TEST_RETRY_MAX, retry_delay=TEST_RETRY_DELAY - ).AndReturn(mock_rlo) - config = dict(self.config) config["use_rid"] = 1 - self.mox.ReplayAll() + source = ldapsource.LdapSource(config) + self.assertEqual(0, source.Verify(0)) + self.ldap_mock.ReconnectLDAPObject.return_value.search_ext.assert_called_with( + base=mock.ANY, + filterstr=filterstr, + scope=ldap.SCOPE_ONELEVEL, + attrlist=attrlist, + serverctrls=self.compareSPRC(), + ) class TestUpdateGetter(unittest.TestCase): From 60b8886061516592af2af10bc55c4d0f6eea4716 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Sat, 3 Jun 2023 10:02:02 +1000 Subject: [PATCH 3/4] test: Wrap optional library tests with skiptest. --- nss_cache/sources/gcssource_test.py | 6 +++++- nss_cache/sources/s3source_test.py | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/nss_cache/sources/gcssource_test.py b/nss_cache/sources/gcssource_test.py index 8725eb6..421258c 100644 --- a/nss_cache/sources/gcssource_test.py +++ b/nss_cache/sources/gcssource_test.py @@ -8,10 +8,14 @@ from nss_cache.maps import group from nss_cache.maps import passwd from nss_cache.maps import shadow -from nss_cache.sources import gcssource from nss_cache.util import file_formats from nss_cache.util import timestamps +try: + from nss_cache.sources import gcssource +except Exception as e: + raise unittest.SkipTest("`gcssource` unabled to be imported: {}".format(e)) + class TestGcsSource(unittest.TestCase): def setUp(self): diff --git a/nss_cache/sources/s3source_test.py b/nss_cache/sources/s3source_test.py index 6c678f1..40d3e5f 100644 --- a/nss_cache/sources/s3source_test.py +++ b/nss_cache/sources/s3source_test.py @@ -8,7 +8,11 @@ from nss_cache.maps import group from nss_cache.maps import passwd from nss_cache.maps import shadow -from nss_cache.sources import s3source + +try: + from nss_cache.sources import s3source +except Exception as e: + raise unittest.SkipTest("s3source unable to be imported: {}".format(e)) class TestS3Source(unittest.TestCase): From 91604c546972215ddbe8bd710f755593fc9afa92 Mon Sep 17 00:00:00 2001 From: Jamie Wilkinson Date: Sat, 3 Jun 2023 10:11:12 +1000 Subject: [PATCH 4/4] test: Unskip consulsource_test GetMap. --- nss_cache/sources/consulsource.py | 3 +++ nss_cache/sources/consulsource_test.py | 1 - 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/nss_cache/sources/consulsource.py b/nss_cache/sources/consulsource.py index f277df7..7f5e547 100644 --- a/nss_cache/sources/consulsource.py +++ b/nss_cache/sources/consulsource.py @@ -202,6 +202,9 @@ def _ReadEntry(self, name, entry): return None try: + s = entry.get("members", "").decode("utf-8") + members = s.split("\n") + except AttributeError: members = entry.get("members", "").split("\n") except (ValueError, TypeError): members = [""] diff --git a/nss_cache/sources/consulsource_test.py b/nss_cache/sources/consulsource_test.py index ebdbf5c..26ab06a 100644 --- a/nss_cache/sources/consulsource_test.py +++ b/nss_cache/sources/consulsource_test.py @@ -106,7 +106,6 @@ def setUp(self): self.good_entry.members = ["foo", "bar"] self.parser = consulsource.ConsulGroupMapParser() - @unittest.skip("broken") def testGetMap(self): group_map = group.GroupMap() cache_info = StringIO(