Skip to content

Commit

Permalink
Merge pull request #429 from appsembler/john/pipeline-workflow-next
Browse files Browse the repository at this point in the history
Figures pipeline performance improvement
  • Loading branch information
johnbaldwin authored Mar 1, 2022
2 parents 9b7cee6 + 6c4e043 commit deaa5fd
Show file tree
Hide file tree
Showing 11 changed files with 635 additions and 73 deletions.
115 changes: 115 additions & 0 deletions figures/course.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""Course specific module for Figures
## This module defined a `Course` class for data retrieval
Initialy created to do the following:
1. Reduce duplication in Figures "pipeline"
2. Build stronger course context to make Figures programming easier
## Background summary
A course id is globally unique as it has the identify of the organization and
organizations are globally unique.
## Design to think about - Enrollment class
Build on Django's lazy eval for querysets to create also an `Enrollment` class
that provides interfaces for `enrollment.date_for` and abstracts this bit of
mess that enrollments and student modules do NOT associate and instead we need
to query back and forth with `user_id` and `course_id`.
"""
from __future__ import absolute_import
from django.db.models import Q
from figures.compat import CourseEnrollment, StudentModule
from figures.helpers import (
as_course_key,
as_date,
)
from figures.sites import (
get_site_for_course,
)


class Course(object):
"""Representation of a Course.
The impetus for this class was dealing with querying for course enrollment
and student module records for a specific course and for dates and date
ranges for the course
## Architecture goal
**Start simple and don't build the kitchen sink into here right away just
because this class exists**
## Data under consideration to have this class handle
* enrollments created on a date, before, after, between. However this would
just be a convenience as the `.enrollments` property returns a queryset that
can be filtered on `.created`
"""
def __init__(self, course_id):
"""
Initial version, we pass in a course ID and cast to a course key as an
instance attribute. Later on, add `CourseLike` to abstract course identity
so we can stop worrying about "Is it a string repretentation of a course or
is it a CourseKey?"
"""
self.course_key = as_course_key(course_id)

# Improvement: Consider doing lazy evaluation
self.site = get_site_for_course(self.course_id)

def __repr__(self):
return '{}.{} <{}>'.format(self.__module__,
self.__class__.__name__,
str(self.course_key))

def __str__(self):
return self.__repr__()

@property
def course_id(self):
"""Returns string representation of the course id
"""
return str(self.course_key)

@property
def enrollments(self):
"""Returns CourseEnrollment queryset for the course
"""
return CourseEnrollment.objects.filter(course_id=self.course_key)

@property
def student_modules(self):
"""Returns StudentModule queryset for enrollments in the course
"""
return StudentModule.objects.filter(course_id=self.course_key)

def student_modules_active_on_date(self, date_for):
"""Returns StudentModule queryset active on the date
Active is if there was a `created` or `modified` field for the given date
NOTE: We need to do this instead of simplly `modified__date=date_for`
because we still have to support Django 1.8/Ginkgo
"""
date_for = as_date(date_for)
q_created = Q(created__year=date_for.year,
created__month=date_for.month,
created__day=date_for.day)
q_modified = Q(modified__year=date_for.year,
modified__month=date_for.month,
modified__day=date_for.day)
return self.student_modules.filter(q_created | q_modified)

def enrollments_active_on_date(self, date_for):
"""Return CourseEnrollment queryset for enrollments active on the date
Looks for student modules modified on the specified date and returns
matching CourseEnrollment records
"""
sm = self.student_modules_active_on_date(date_for)
user_ids = sm.values('student_id').distinct()
return CourseEnrollment.objects.filter(course_id=self.course_key,
user_id__in=user_ids)
Original file line number Diff line number Diff line change
Expand Up @@ -70,16 +70,11 @@ def handle(self, *args, **options):
del kwargs['site_id'] # not implemented for experimental
else:
metrics_func = populate_daily_metrics
# try:

if options['no_delay']:
metrics_func(**kwargs)
else:
metrics_func.delay(**kwargs) # pragma: no cover
# except Exception as e: # pylint: disable=bare-except
# if options['ignore_exceptions']:
# self.print_exc("daily", dt, e.message)
# else:
# raise

print('END: Backfill Figures daily metrics metrics for: {}'.format(dt))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from textwrap import dedent

from figures.management.base import BaseBackfillCommand
from figures.tasks import update_enrollment_data
from figures.tasks import update_enrollment_data_for_site


class Command(BaseBackfillCommand):
Expand All @@ -23,8 +23,8 @@ def handle(self, *args, **options):
for site_id in self.get_site_ids(options['site']):
print('Updating EnrollmentData for site {}'.format(site_id))
if options['no_delay']:
update_enrollment_data(site_id=site_id)
update_enrollment_data_for_site(site_id=site_id)
else:
update_enrollment_data.delay(site_id=site_id) # pragma: no cover
update_enrollment_data_for_site.delay(site_id=site_id) # pragma: no cover

print('DONE: Backfill Figures EnrollmentData')
59 changes: 47 additions & 12 deletions figures/pipeline/course_daily_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@
from figures.pipeline.logger import log_error
import figures.pipeline.loaders
from figures.pipeline.enrollment_metrics import bulk_calculate_course_progress_data
from figures.pipeline.enrollment_metrics_next import (
calculate_course_progress as calculate_course_progress_next
)

from figures.serializers import CourseIndexSerializer
import figures.sites
from figures.pipeline.helpers import pipeline_date_for_rule
Expand Down Expand Up @@ -235,16 +239,32 @@ class CourseDailyMetricsExtractor(object):
BUT, we will then need to find a transform
"""

def extract(self, course_id, date_for, **_kwargs):
"""
defaults = dict(
def extract(self, course_id, date_for, ed_next=False, **_kwargs):
"""Extracts (collects) aggregated course level data
Args:
course_id (:obj:`str` or :obj:`CourseKey`): The course for which we collect data
date_for (str or date): Deprecated. Was to backfill data.
Specialized TBD backfill data will be called instead.
ed_next (bool, optional): "Enrollment Data Next" flag. If set to `True`
then we collect metrics with our updated workflow. See here:
https://github.com/appsembler/figures/issues/428
Returns:
dict with aggregate course level metrics
```
dict(
enrollment_count=data['enrollment_count'],
active_learners_today=data['active_learners_today'],
average_progress=data.get('average_progress', None),
average_days_to_complete=data.get('average_days_to_complete, None'),
num_learners_completed=data['num_learners_completed'],
)
TODO: refactor this class
```
TODO: refactor this class. It doesn't need to be a class. Can be a
standalone function
Add lazy loading method to load course enrollments
- Create a method for each metric field
"""
Expand Down Expand Up @@ -290,17 +310,31 @@ def extract(self, course_id, date_for, **_kwargs):
logger.debug(msg.format(date_for=date_for, course_id=course_id))
else:
try:
progress_data = bulk_calculate_course_progress_data(course_id=course_id,
date_for=date_for)
# This conditional check is an interim solution until we make
# the progress function configurable and able to run Figures
# plugins
if ed_next:
progress_data = calculate_course_progress_next(course_id=course_id)
else:
progress_data = bulk_calculate_course_progress_data(course_id=course_id,
date_for=date_for)
data['average_progress'] = progress_data['average_progress']
except Exception: # pylint: disable=broad-except
# Broad exception for starters. Refine as we see what gets caught
# Make sure we set the average_progres to None so that upstream
# does not think things are normal
data['average_progress'] = None
msg = ('FIGURES:FAIL bulk_calculate_course_progress_data'

if ed_next:
prog_func = 'calculate_course_progress_next'
else:
prog_func = 'bulk_calculate_course_progress_data'

msg = ('FIGURES:FAIL {prog_func}'
' date_for={date_for}, course_id="{course_id}"')
logger.exception(msg.format(date_for=date_for, course_id=course_id))
logger.exception(msg.format(prog_func=prog_func,
date_for=date_for,
course_id=course_id))

data['average_days_to_complete'] = get_average_days_to_complete(
course_id, date_for,)
Expand All @@ -319,10 +353,11 @@ def __init__(self, course_id):
self.extractor = CourseDailyMetricsExtractor()
self.site = figures.sites.get_site_for_course(self.course_id)

def get_data(self, date_for):
def get_data(self, date_for, ed_next=False):
return self.extractor.extract(
course_id=self.course_id,
date_for=date_for)
date_for=date_for,
ed_next=ed_next)

@transaction.atomic
def save_metrics(self, date_for, data):
Expand Down Expand Up @@ -350,7 +385,7 @@ def save_metrics(self, date_for, data):
cdm.clean_fields()
return (cdm, created,)

def load(self, date_for=None, force_update=False, **_kwargs):
def load(self, date_for=None, ed_next=False, force_update=False, **_kwargs):
"""
TODO: clean up how we do this. We want to be able to call the loader
with an existing data set (not having to call the extractor) but we
Expand All @@ -376,5 +411,5 @@ def load(self, date_for=None, force_update=False, **_kwargs):
# record not found, move on to creating
pass

data = self.get_data(date_for=date_for)
data = self.get_data(date_for=date_for, ed_next=ed_next)
return self.save_metrics(date_for=date_for, data=data)
68 changes: 68 additions & 0 deletions figures/pipeline/enrollment_metrics_next.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
"""This module updates Figures enrollment data and calculates aggregate progress
* It updates `EnrollmentData` and `LearnerCourseGradeMetrics` records
* It calculate course progress from EnrollmentData records
This generates the same metrics as the original enrollment_metrics modules,
but does it differently.
## How it differs from the previous version
This module improves on the existing enrollment metrics collection module,
`figures.pipeline.enrollment_metrics`
* It separates the activities to create and update Figures per-enrollment data
collected
* This separation lets Figures run metrics in distinct stages
* First, collect per-enrollment data
* Second, aggregate metrics based on collected data
* This provides a workflow that is easier to resume if interrupted
* This provides workflow that is simpler to debug
* This simplifies and speeds up aggregate progress metrics, collapsing complex
code into a single Django queryset aggregation
* This update lays groundwork for further metrics improvements and enhancements
such as metrics on subsets of learners in a course or progress of subsets of
learners across courses
# Developer Notes
This module provides
"""
from django.db.models import Avg
from figures.course import Course
from figures.helpers import utc_yesterday
from figures.models import EnrollmentData
from figures.sites import UnlinkedCourseError


def update_enrollment_data_for_course(course_id):
"""Updates Figures per-enrollment data for enrollments in the course
Checks for and creates new `LearnerCourseGradeMetrics` records and updates
`EnrollmentData` records
Return results are a list of the results returned by `update_enrollment_data`
"""
date_for = utc_yesterday()
the_course = Course(course_id)
if not the_course.site:
raise UnlinkedCourseError('No site found for course "{}"'.format(course_id))

# Any updated student module records? if so, then get the unique enrollments
# for each enrollment, check if LGCM is out of date or up to date
active_enrollments = the_course.enrollments_active_on_date(date_for)
return [EnrollmentData.objects.update_metrics(the_course.site, ce)
for ce in active_enrollments]


def calculate_course_progress(course_id):
"""Return average progress percentage for all enrollments in the course
"""
results = EnrollmentData.objects.filter(course_id=str(course_id)).aggregate(
average_progress=Avg('progress_percent'))

# This is a bit of a hack. When we overhaul progress data, we should really
# have None for progress if there's no data. But check how SQL AVG performs
if results['average_progress'] is None:
results['average_progress'] = 0.0
return results
Loading

0 comments on commit deaa5fd

Please sign in to comment.