Skip to content

Commit

Permalink
Implement support for <QuerySet>.as_manager() (#1025)
Browse files Browse the repository at this point in the history
* Implement support for `<QuerySet>.as_manager()`

* fixup! Implement support for `<QuerySet>.as_manager()`

* fixup! fixup! Implement support for `<QuerySet>.as_manager()`
  • Loading branch information
flaeppe authored Sep 29, 2022
1 parent 1f2e406 commit 54d5835
Show file tree
Hide file tree
Showing 8 changed files with 583 additions and 83 deletions.
18 changes: 8 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -145,9 +145,7 @@ And then use `AuthenticatedHttpRequest` instead of the standard `HttpRequest` fo

### My QuerySet methods are returning Any rather than my Model

`QuerySet.as_manager()` is not currently supported.

If you are using `MyQuerySet.as_manager()`, then your `Manager`/`QuerySet` methods will all not be linked to your model.
If you are using `MyQuerySet.as_manager()`:

Example:

Expand All @@ -163,12 +161,12 @@ class MyModel(models.Model):
objects = MyModelQuerySet.as_manager()


def use_my_model():
foo = MyModel.objects.get(id=1) # This is `Any` but it should be `MyModel`
return foo.xyz # No error, but there should be
def use_my_model() -> int:
foo = MyModel.objects.get(id=1) # Should now be `MyModel`
return foo.xyz # Gives an error
```

There is a workaround: use `Manager.from_queryset` instead.
Or if you're using `Manager.from_queryset`:

Example:

Expand All @@ -188,9 +186,9 @@ class MyModel(models.Model):
objects = MyModelManager()


def use_my_model():
foo = MyModel.objects.get(id=1)
return foo.xyz # Gives an error
def use_my_model() -> int:
foo = MyModel.objects.get(id=1) # Should now be `MyModel`
return foo.xyz # Gives an error
```

### How do I annotate cases where I called QuerySet.annotate?
Expand Down
20 changes: 8 additions & 12 deletions mypy_django_plugin/lib/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,29 +387,27 @@ def bind_or_analyze_type(t: MypyType, api: SemanticAnalyzer, module_name: Option


def copy_method_to_another_class(
ctx: ClassDefContext,
api: SemanticAnalyzer,
cls: ClassDef,
self_type: Instance,
new_method_name: str,
method_node: FuncDef,
return_type: Optional[MypyType] = None,
original_module_name: Optional[str] = None,
) -> bool:
semanal_api = get_semanal_api(ctx)
if method_node.type is None:
arguments, return_type = build_unannotated_method_args(method_node)
add_method_to_class(
semanal_api, ctx.cls, new_method_name, args=arguments, return_type=return_type, self_type=self_type
)
add_method_to_class(api, cls, new_method_name, args=arguments, return_type=return_type, self_type=self_type)
return True

method_type = method_node.type
if not isinstance(method_type, CallableType):
if not semanal_api.final_iteration:
semanal_api.defer()
if not api.final_iteration:
api.defer()
return False

if return_type is None:
return_type = bind_or_analyze_type(method_type.ret_type, semanal_api, original_module_name)
return_type = bind_or_analyze_type(method_type.ret_type, api, original_module_name)
if return_type is None:
return False

Expand All @@ -422,7 +420,7 @@ def copy_method_to_another_class(
zip(method_type.arg_types[1:], method_type.arg_kinds[1:], method_type.arg_names[1:]),
start=1,
):
bound_arg_type = bind_or_analyze_type(arg_type, semanal_api, original_module_name)
bound_arg_type = bind_or_analyze_type(arg_type, api, original_module_name)
if bound_arg_type is None:
return False
if arg_name is None and hasattr(method_node, "arguments"):
Expand All @@ -438,9 +436,7 @@ def copy_method_to_another_class(
)
)

add_method_to_class(
semanal_api, ctx.cls, new_method_name, args=arguments, return_type=return_type, self_type=self_type
)
add_method_to_class(api, cls, new_method_name, args=arguments, return_type=return_type, self_type=self_type)

return True

Expand Down
9 changes: 7 additions & 2 deletions mypy_django_plugin/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from mypy_django_plugin.transformers import fields, forms, init_create, meta, querysets, request, settings
from mypy_django_plugin.transformers.functional import resolve_str_promise_attribute
from mypy_django_plugin.transformers.managers import (
create_new_manager_class_from_as_manager_method,
create_new_manager_class_from_from_queryset_method,
resolve_manager_method,
)
Expand Down Expand Up @@ -301,11 +302,15 @@ def get_type_analyze_hook(self, fullname: str) -> Optional[Callable[[AnalyzeType

def get_dynamic_class_hook(self, fullname: str) -> Optional[Callable[[DynamicClassDefContext], None]]:
# Create a new manager class definition when a manager's '.from_queryset' classmethod is called
if fullname.endswith("from_queryset"):
class_name, _, _ = fullname.rpartition(".")
class_name, _, method_name = fullname.rpartition(".")
if method_name == "from_queryset":
info = self._get_typeinfo_or_none(class_name)
if info and info.has_base(fullnames.BASE_MANAGER_CLASS_FULLNAME):
return create_new_manager_class_from_from_queryset_method
elif method_name == "as_manager":
info = self._get_typeinfo_or_none(class_name)
if info and info.has_base(fullnames.QUERYSET_CLASS_FULLNAME):
return create_new_manager_class_from_as_manager_method
return None


Expand Down
190 changes: 162 additions & 28 deletions mypy_django_plugin/transformers/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
TypeInfo,
Var,
)
from mypy.plugin import AttributeContext, DynamicClassDefContext, SemanticAnalyzerPluginInterface
from mypy.plugin import AttributeContext, DynamicClassDefContext
from mypy.semanal import SemanticAnalyzer
from mypy.semanal_shared import has_placeholder
from mypy.types import AnyType, CallableType, Instance, ProperType
from mypy.types import Type as MypyType
Expand Down Expand Up @@ -150,7 +151,6 @@ def get_method_type_from_reverse_manager(


def resolve_manager_method_from_instance(instance: Instance, method_name: str, ctx: AttributeContext) -> MypyType:

api = helpers.get_typechecker_api(ctx)
method_type = get_method_type_from_dynamic_manager(
api, method_name, instance
Expand All @@ -164,9 +164,11 @@ def resolve_manager_method(ctx: AttributeContext) -> MypyType:
A 'get_attribute_hook' that is intended to be invoked whenever the TypeChecker encounters
an attribute on a class that has 'django.db.models.BaseManager' as a base.
"""
# Skip (method) type that is currently something other than Any
# Skip (method) type that is currently something other than Any of type `implementation_artifact`
if not isinstance(ctx.default_attr_type, AnyType):
return ctx.default_attr_type
elif ctx.default_attr_type.type_of_any != TypeOfAny.implementation_artifact:
return ctx.default_attr_type

# (Current state is:) We wouldn't end up here when looking up a method from a custom _manager_.
# That's why we only attempt to lookup the method for either a dynamically added or reverse manager.
Expand Down Expand Up @@ -197,12 +199,12 @@ def create_new_manager_class_from_from_queryset_method(ctx: DynamicClassDefConte
return

# Don't redeclare the manager class if we've already defined it.
manager_node = semanal_api.lookup_current_scope(ctx.name)
if manager_node and isinstance(manager_node.node, TypeInfo):
manager_sym = semanal_api.lookup_current_scope(ctx.name)
if manager_sym and isinstance(manager_sym.node, TypeInfo):
# This is just a deferral run where our work is already finished
return

new_manager_info = create_manager_info_from_from_queryset_call(ctx.api, ctx.call, ctx.name)
new_manager_info = create_manager_info_from_from_queryset_call(semanal_api, ctx.call, ctx.name)
if new_manager_info is None:
if not ctx.api.final_iteration:
ctx.api.defer()
Expand All @@ -212,8 +214,17 @@ def create_new_manager_class_from_from_queryset_method(ctx: DynamicClassDefConte
helpers.add_new_manager_base(semanal_api, new_manager_info.fullname)


def register_dynamically_created_manager(fullname: str, manager_name: str, manager_base: TypeInfo) -> None:
manager_base.metadata.setdefault("from_queryset_managers", {})
# The `__module__` value of the manager type created by Django's
# `.from_queryset` is `django.db.models.manager`. But we put new type(s) in the
# module currently being processed, so we'll map those together through metadata.
runtime_fullname = ".".join(["django.db.models.manager", manager_name])
manager_base.metadata["from_queryset_managers"][runtime_fullname] = fullname


def create_manager_info_from_from_queryset_call(
api: SemanticAnalyzerPluginInterface, call_expr: CallExpr, name: Optional[str] = None
api: SemanticAnalyzer, call_expr: CallExpr, name: Optional[str] = None
) -> Optional[TypeInfo]:
"""
Extract manager and queryset TypeInfo from a from_queryset call.
Expand Down Expand Up @@ -247,30 +258,48 @@ def create_manager_info_from_from_queryset_call(
else:
manager_name = f"{base_manager_info.name}From{queryset_info.name}"

try:
new_manager_info = create_manager_class(api, base_manager_info, name or manager_name, call_expr.line)
except helpers.IncompleteDefnException:
return None

popuplate_manager_from_queryset(new_manager_info, queryset_info)

manager_fullname = ".".join(["django.db.models.manager", manager_name])

base_manager_info = new_manager_info.mro[1]
base_manager_info.metadata.setdefault("from_queryset_managers", {})
base_manager_info.metadata["from_queryset_managers"][manager_fullname] = new_manager_info.fullname
# Always look in global scope, as that's where we'll declare dynamic manager classes
manager_sym = api.globals.get(manager_name)
if (
manager_sym is not None
and isinstance(manager_sym.node, TypeInfo)
and manager_sym.node.has_base(base_manager_info.fullname)
and manager_sym.node.metadata.get("django", {}).get("from_queryset_manager") == queryset_info.fullname
):
# Reuse an identical, already generated, manager
new_manager_info = manager_sym.node
else:
# Create a new `TypeInfo` instance for the manager type
try:
new_manager_info = create_manager_class(
api=api,
base_manager_info=base_manager_info,
name=manager_name,
line=call_expr.line,
with_unique_name=name is not None and name != manager_name,
)
except helpers.IncompleteDefnException:
return None

populate_manager_from_queryset(new_manager_info, queryset_info)
register_dynamically_created_manager(
fullname=new_manager_info.fullname,
manager_name=manager_name,
manager_base=base_manager_info,
)

# Add the new manager to the current module
module = api.modules[api.cur_mod_id]
module.names[name or manager_name] = SymbolTableNode(
GDEF, new_manager_info, plugin_generated=True, no_serialize=False
)
if name is not None and name != new_manager_info.name:
# Unless names are equal, there's 2 symbol names that needs the manager info
module.names[name] = SymbolTableNode(GDEF, new_manager_info, plugin_generated=True)

module.names[new_manager_info.name] = SymbolTableNode(GDEF, new_manager_info, plugin_generated=True)
return new_manager_info


def create_manager_class(
api: SemanticAnalyzerPluginInterface, base_manager_info: TypeInfo, name: str, line: int
api: SemanticAnalyzer, base_manager_info: TypeInfo, name: str, line: int, with_unique_name: bool
) -> TypeInfo:

base_manager_instance = fill_typevars(base_manager_info)
Expand All @@ -280,17 +309,24 @@ def create_manager_class(
if any(has_placeholder(type_var) for type_var in base_manager_info.defn.type_vars):
raise helpers.IncompleteDefnException

manager_info = helpers.create_type_info(name, api.cur_mod_id, bases=[base_manager_instance])
if with_unique_name:
manager_info = helpers.add_new_class_for_module(
module=api.modules[api.cur_mod_id],
name=name,
bases=[base_manager_instance],
)
else:
manager_info = helpers.create_type_info(name, api.cur_mod_id, bases=[base_manager_instance])

manager_info.line = line
manager_info.type_vars = base_manager_info.type_vars
manager_info.defn.type_vars = base_manager_info.defn.type_vars
manager_info.defn.line = line
manager_info.metaclass_type = manager_info.calculate_metaclass_type()

return manager_info


def popuplate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeInfo) -> None:
def populate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeInfo) -> None:
"""
Add methods from the QuerySet class to the manager.
"""
Expand Down Expand Up @@ -318,7 +354,7 @@ def popuplate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeI
helpers.add_new_sym_for_info(
manager_info,
name=name,
sym_type=AnyType(TypeOfAny.special_form),
sym_type=AnyType(TypeOfAny.implementation_artifact),
)

# For methods on BaseManager that return a queryset we need to update
Expand All @@ -330,5 +366,103 @@ def popuplate_manager_from_queryset(manager_info: TypeInfo, queryset_info: TypeI
helpers.add_new_sym_for_info(
manager_info,
name=method_name,
sym_type=AnyType(TypeOfAny.special_form),
sym_type=AnyType(TypeOfAny.implementation_artifact),
)


def create_new_manager_class_from_as_manager_method(ctx: DynamicClassDefContext) -> None:
"""
Insert a new manager class node for a
```
<manager name> = <QuerySet>.as_manager()
```
"""
semanal_api = helpers.get_semanal_api(ctx)
# Don't redeclare the manager class if we've already defined it.
manager_node = semanal_api.lookup_current_scope(ctx.name)
if manager_node and manager_node.type is not None:
# This is just a deferral run where our work is already finished
return

manager_sym = semanal_api.lookup_fully_qualified_or_none(fullnames.MANAGER_CLASS_FULLNAME)
assert manager_sym is not None
manager_base = manager_sym.node
if manager_base is None:
if not semanal_api.final_iteration:
semanal_api.defer()
return

assert isinstance(manager_base, TypeInfo)

callee = ctx.call.callee
assert isinstance(callee, MemberExpr)
assert isinstance(callee.expr, RefExpr)

queryset_info = callee.expr.node
if queryset_info is None:
if not semanal_api.final_iteration:
semanal_api.defer()
return

assert isinstance(queryset_info, TypeInfo)

manager_class_name = manager_base.name + "From" + queryset_info.name
current_module = semanal_api.modules[semanal_api.cur_mod_id]
existing_sym = current_module.names.get(manager_class_name)
if (
existing_sym is not None
and isinstance(existing_sym.node, TypeInfo)
and existing_sym.node.has_base(fullnames.MANAGER_CLASS_FULLNAME)
and existing_sym.node.metadata.get("django", {}).get("from_queryset_manager") == queryset_info.fullname
):
# Reuse an identical, already generated, manager
new_manager_info = existing_sym.node
else:
# Create a new `TypeInfo` instance for the manager type
try:
new_manager_info = create_manager_class(
api=semanal_api,
base_manager_info=manager_base,
name=manager_class_name,
line=ctx.call.line,
with_unique_name=True,
)
except helpers.IncompleteDefnException:
if not semanal_api.final_iteration:
semanal_api.defer()
return

populate_manager_from_queryset(new_manager_info, queryset_info)
register_dynamically_created_manager(
fullname=new_manager_info.fullname,
manager_name=manager_class_name,
manager_base=manager_base,
)

# So that the plugin will reparameterize the manager when it is constructed inside of a Model definition
helpers.add_new_manager_base(semanal_api, new_manager_info.fullname)

# Whenever `<QuerySet>.as_manager()` isn't called at class level, we want to ensure
# that the variable is an instance of our generated manager. Instead of the return
# value of `.as_manager()`. Though model argument is populated as `Any`.
# `transformers.models.AddManagers` will populate a model's manager(s), when it
# finds it on class level.
var = Var(name=ctx.name, type=Instance(new_manager_info, [AnyType(TypeOfAny.from_omitted_generics)]))
var.info = new_manager_info
var._fullname = f"{current_module.fullname}.{ctx.name}"
var.is_inferred = True
# Note: Order of `add_symbol_table_node` calls matters. Depending on what level
# we've found the `.as_manager()` call. Point here being that we want to replace the
# `.as_manager` return value with our newly created manager.
assert semanal_api.add_symbol_table_node(
ctx.name, SymbolTableNode(semanal_api.current_symbol_kind(), var, plugin_generated=True)
)
# Add the new manager to the current module
assert semanal_api.add_symbol_table_node(
# We'll use `new_manager_info.name` instead of `manager_class_name` here
# to handle possible name collisions, as it's unique.
new_manager_info.name,
# Note that the generated manager type is always inserted at module level
SymbolTableNode(GDEF, new_manager_info, plugin_generated=True),
)
Loading

0 comments on commit 54d5835

Please sign in to comment.