From dfc4c95762f417e84dcb21dbbe6399ab7b7cef19 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 31 Mar 2023 12:18:33 -0600 Subject: [PATCH] gh-101659: Clean Up the General Import Tests for Subinterpreters (gh-103151) This involves 3 changes: some general cleanup, checks to match the kind of module, and switch from testing against sys to _imp. This is a precursor to gh-103150, though the changes are meant to stand on their own. --- Lib/test/test_import/__init__.py | 150 ++++++++++++++++++++++--------- 1 file changed, 106 insertions(+), 44 deletions(-) diff --git a/Lib/test/test_import/__init__.py b/Lib/test/test_import/__init__.py index 96815b3f758a5b..3ef07203c46c7e 100644 --- a/Lib/test/test_import/__init__.py +++ b/Lib/test/test_import/__init__.py @@ -4,6 +4,9 @@ import glob import importlib.util from importlib._bootstrap_external import _get_sourcefile +from importlib.machinery import ( + BuiltinImporter, ExtensionFileLoader, FrozenImporter, SourceFileLoader, +) import marshal import os import py_compile @@ -44,6 +47,49 @@ sys.dont_write_bytecode, "test meaningful only when writing bytecode") + +def _require_loader(module, loader, skip): + if isinstance(module, str): + module = __import__(module) + + MODULE_KINDS = { + BuiltinImporter: 'built-in', + ExtensionFileLoader: 'extension', + FrozenImporter: 'frozen', + SourceFileLoader: 'pure Python', + } + + expected = loader + assert isinstance(expected, type), expected + expected = MODULE_KINDS[expected] + + actual = module.__spec__.loader + if not isinstance(actual, type): + actual = type(actual) + actual = MODULE_KINDS[actual] + + if actual != expected: + err = f'expected module to be {expected}, got {module.__spec__}' + if skip: + raise unittest.SkipTest(err) + raise Exception(err) + return module + +def require_builtin(module, *, skip=False): + module = _require_loader(module, BuiltinImporter, skip) + assert module.__spec__.origin == 'built-in', module.__spec__ + +def require_extension(module, *, skip=False): + _require_loader(module, ExtensionFileLoader, skip) + +def require_frozen(module, *, skip=True): + module = _require_loader(module, FrozenImporter, skip) + assert module.__spec__.origin == 'frozen', module.__spec__ + +def require_pure_python(module, *, skip=False): + _require_loader(module, SourceFileLoader, skip) + + def remove_files(name): for f in (name + ".py", name + ".pyc", @@ -1437,10 +1483,10 @@ def import_script(self, name, fd, check_override=None): os.write({fd}, text.encode('utf-8')) ''') - def run_shared(self, name, *, - check_singlephase_setting=False, - check_singlephase_override=None, - ): + def run_here(self, name, *, + check_singlephase_setting=False, + check_singlephase_override=None, + ): """ Try importing the named module in a subinterpreter. @@ -1470,27 +1516,35 @@ def run_shared(self, name, *, self.assertEqual(ret, 0) return os.read(r, 100) - def check_compatible_shared(self, name, *, strict=False): + def check_compatible_here(self, name, *, strict=False): # Verify that the named module may be imported in a subinterpreter. - # (See run_shared() for more info.) - out = self.run_shared(name, check_singlephase_setting=strict) + # (See run_here() for more info.) + out = self.run_here(name, + check_singlephase_setting=strict, + ) self.assertEqual(out, b'okay') - def check_incompatible_shared(self, name): - # Differences from check_compatible_shared(): + def check_incompatible_here(self, name): + # Differences from check_compatible_here(): # * verify that import fails # * "strict" is always True - out = self.run_shared(name, check_singlephase_setting=True) + out = self.run_here(name, + check_singlephase_setting=True, + ) self.assertEqual( out.decode('utf-8'), f'ImportError: module {name} does not support loading in subinterpreters', ) - def check_compatible_isolated(self, name, *, strict=False): - # Differences from check_compatible_shared(): + def check_compatible_fresh(self, name, *, strict=False): + # Differences from check_compatible_here(): # * subinterpreter in a new process # * module has never been imported before in that process # * this tests importing the module for the first time + kwargs = dict( + **self.RUN_KWARGS, + check_multi_interp_extensions=strict, + ) _, out, err = script_helper.assert_python_ok('-c', textwrap.dedent(f''' import _testcapi, sys assert ( @@ -1499,25 +1553,27 @@ def check_compatible_isolated(self, name, *, strict=False): ), repr({name!r}) ret = _testcapi.run_in_subinterp_with_config( {self.import_script(name, "sys.stdout.fileno()")!r}, - **{self.RUN_KWARGS}, - check_multi_interp_extensions={strict}, + **{kwargs}, ) assert ret == 0, ret ''')) self.assertEqual(err, b'') self.assertEqual(out, b'okay') - def check_incompatible_isolated(self, name): - # Differences from check_compatible_isolated(): + def check_incompatible_fresh(self, name): + # Differences from check_compatible_fresh(): # * verify that import fails # * "strict" is always True + kwargs = dict( + **self.RUN_KWARGS, + check_multi_interp_extensions=True, + ) _, out, err = script_helper.assert_python_ok('-c', textwrap.dedent(f''' import _testcapi, sys assert {name!r} not in sys.modules, {name!r} ret = _testcapi.run_in_subinterp_with_config( {self.import_script(name, "sys.stdout.fileno()")!r}, - **{self.RUN_KWARGS}, - check_multi_interp_extensions=True, + **{kwargs}, ) assert ret == 0, ret ''')) @@ -1528,59 +1584,65 @@ def check_incompatible_isolated(self, name): ) def test_builtin_compat(self): - module = 'sys' + # For now we avoid using sys or builtins + # since they still don't implement multi-phase init. + module = '_imp' + require_builtin(module) with self.subTest(f'{module}: not strict'): - self.check_compatible_shared(module, strict=False) - with self.subTest(f'{module}: strict, shared'): - self.check_compatible_shared(module, strict=True) + self.check_compatible_here(module, strict=False) + with self.subTest(f'{module}: strict, not fresh'): + self.check_compatible_here(module, strict=True) @cpython_only def test_frozen_compat(self): module = '_frozen_importlib' + require_frozen(module, skip=True) if __import__(module).__spec__.origin != 'frozen': raise unittest.SkipTest(f'{module} is unexpectedly not frozen') with self.subTest(f'{module}: not strict'): - self.check_compatible_shared(module, strict=False) - with self.subTest(f'{module}: strict, shared'): - self.check_compatible_shared(module, strict=True) + self.check_compatible_here(module, strict=False) + with self.subTest(f'{module}: strict, not fresh'): + self.check_compatible_here(module, strict=True) @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module") def test_single_init_extension_compat(self): module = '_testsinglephase' + require_extension(module) with self.subTest(f'{module}: not strict'): - self.check_compatible_shared(module, strict=False) - with self.subTest(f'{module}: strict, shared'): - self.check_incompatible_shared(module) - with self.subTest(f'{module}: strict, isolated'): - self.check_incompatible_isolated(module) + self.check_compatible_here(module, strict=False) + with self.subTest(f'{module}: strict, not fresh'): + self.check_incompatible_here(module) + with self.subTest(f'{module}: strict, fresh'): + self.check_incompatible_fresh(module) @unittest.skipIf(_testmultiphase is None, "test requires _testmultiphase module") def test_multi_init_extension_compat(self): module = '_testmultiphase' + require_extension(module) with self.subTest(f'{module}: not strict'): - self.check_compatible_shared(module, strict=False) - with self.subTest(f'{module}: strict, shared'): - self.check_compatible_shared(module, strict=True) - with self.subTest(f'{module}: strict, isolated'): - self.check_compatible_isolated(module, strict=True) + self.check_compatible_here(module, strict=False) + with self.subTest(f'{module}: strict, not fresh'): + self.check_compatible_here(module, strict=True) + with self.subTest(f'{module}: strict, fresh'): + self.check_compatible_fresh(module, strict=True) def test_python_compat(self): module = 'threading' - if __import__(module).__spec__.origin == 'frozen': - raise unittest.SkipTest(f'{module} is unexpectedly frozen') + require_pure_python(module) with self.subTest(f'{module}: not strict'): - self.check_compatible_shared(module, strict=False) - with self.subTest(f'{module}: strict, shared'): - self.check_compatible_shared(module, strict=True) - with self.subTest(f'{module}: strict, isolated'): - self.check_compatible_isolated(module, strict=True) + self.check_compatible_here(module, strict=False) + with self.subTest(f'{module}: strict, not fresh'): + self.check_compatible_here(module, strict=True) + with self.subTest(f'{module}: strict, fresh'): + self.check_compatible_fresh(module, strict=True) @unittest.skipIf(_testsinglephase is None, "test requires _testsinglephase module") def test_singlephase_check_with_setting_and_override(self): module = '_testsinglephase' + require_extension(module) def check_compatible(setting, override): - out = self.run_shared( + out = self.run_here( module, check_singlephase_setting=setting, check_singlephase_override=override, @@ -1588,7 +1650,7 @@ def check_compatible(setting, override): self.assertEqual(out, b'okay') def check_incompatible(setting, override): - out = self.run_shared( + out = self.run_here( module, check_singlephase_setting=setting, check_singlephase_override=override,