Skip to content

Commit

Permalink
Merge pull request #145 from jamalex/partition_specific_instance_coun…
Browse files Browse the repository at this point in the history
…ters

First pass implementation of FSIC v2 format, to fix #144
  • Loading branch information
bjester authored Feb 1, 2022
2 parents 7eb702f + 5be6f0a commit ce83e08
Show file tree
Hide file tree
Showing 17 changed files with 1,155 additions and 200 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,6 @@ Pipfile
# Jetbrains IDE
.idea

tests/testapp/testapp.db
tests/testapp/testapp2.db
*.db

.vscode
2 changes: 1 addition & 1 deletion morango/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
from __future__ import unicode_literals

default_app_config = "morango.apps.MorangoConfig"
__version__ = "0.6.7"
__version__ = "0.6.8a0"
1 change: 1 addition & 0 deletions morango/constants/capabilities.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
GZIP_BUFFER_POST = "GZIP_BUFFER_POST"
ALLOW_CERTIFICATE_PUSHING = "ALLOW_CERTIFICATE_PUSHING"
ASYNC_OPERATIONS = "ASYNC_OPERATIONS"
FSIC_V2_FORMAT = "FSIC_V2_FORMAT"
1 change: 1 addition & 0 deletions morango/constants/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
MORANGO_SERIALIZE_BEFORE_QUEUING = True
MORANGO_DESERIALIZE_AFTER_DEQUEUING = True
MORANGO_DISALLOW_ASYNC_OPERATIONS = False
MORANGO_DISABLE_FSIC_V2_FORMAT = False
MORANGO_INSTANCE_INFO = {}
MORANGO_INITIALIZE_OPERATIONS = (
"morango.sync.operations:InitializeOperation",
Expand Down
8 changes: 8 additions & 0 deletions morango/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,11 @@ class MorangoLimitExceeded(MorangoError):

class InvalidMorangoSourceId(MorangoError):
pass


class MorangoInvalidFSICPartition(MorangoError):
pass


class MorangoSkipOperation(MorangoError):
pass
2 changes: 1 addition & 1 deletion morango/models/certificates.py
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ def __init__(self, template, params={}):
self._template = template
self._params = params
self._filter_string = string.Template(template).safe_substitute(params)
self._filter_tuple = tuple(self._filter_string.split())
self._filter_tuple = tuple(self._filter_string.split()) or ("",)

def is_subset_of(self, other):
for partition in self._filter_tuple:
Expand Down
230 changes: 151 additions & 79 deletions morango/models/core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import unicode_literals

import functools
import json
import logging
import uuid
Expand All @@ -12,6 +13,7 @@
from django.db.models import F
from django.db.models import Func
from django.db.models import Max
from django.db.models import Q
from django.db.models import TextField
from django.db.models import Value
from django.db.models.deletion import Collector
Expand All @@ -32,6 +34,7 @@
from morango.models.fields.uuids import sha2_uuid
from morango.models.fields.uuids import UUIDField
from morango.models.fields.uuids import UUIDModelMixin
from morango.models.fsic_utils import remove_redundant_instance_counters
from morango.models.manager import SyncableModelManager
from morango.models.morango_mptt import MorangoMPTTModel
from morango.models.utils import get_0_4_system_parameters
Expand Down Expand Up @@ -288,10 +291,16 @@ class TransferSession(models.Model):

# stages and stage status of transfer session
transfer_stage = models.CharField(
max_length=20, choices=transfer_stages.CHOICES, blank=True, null=True,
max_length=20,
choices=transfer_stages.CHOICES,
blank=True,
null=True,
)
transfer_stage_status = models.CharField(
max_length=20, choices=transfer_statuses.CHOICES, blank=True, null=True,
max_length=20,
choices=transfer_statuses.CHOICES,
blank=True,
null=True,
)

@property
Expand Down Expand Up @@ -572,97 +581,160 @@ class Meta:

@classmethod
@transaction.atomic
def update_fsics(cls, fsics, sync_filter):
def update_fsics(cls, fsics, sync_filter, v2_format=False):
internal_fsic = DatabaseMaxCounter.calculate_filter_specific_instance_counters(
sync_filter
sync_filter, v2_format=v2_format
)
updated_fsic = {}
for key, value in six.iteritems(fsics):
if key in internal_fsic:
# if same instance id, update fsic with larger value
if fsics[key] > internal_fsic[key]:
if v2_format:
internal_fsic = internal_fsic["sub"]
fsics = fsics["sub"]
for part, insts in fsics.items():
for inst, counter in insts.items():
if internal_fsic.get("part", {}).get(inst, 0) < counter:
DatabaseMaxCounter.objects.update_or_create(
instance_id=inst,
partition=part,
defaults={"counter": counter},
)
else:
updated_fsic = {}
for key, value in six.iteritems(fsics):
if key in internal_fsic:
# if same instance id, update fsic with larger value
if fsics[key] > internal_fsic[key]:
updated_fsic[key] = fsics[key]
else:
# if instance id is not present, add it to updated fsics
updated_fsic[key] = fsics[key]
else:
# if instance id is not present, add it to updated fsics
updated_fsic[key] = fsics[key]

# load database max counters
for (key, value) in six.iteritems(updated_fsic):
for f in sync_filter:
DatabaseMaxCounter.objects.update_or_create(
instance_id=key, partition=f, defaults={"counter": value}
)

# load database max counters
for (key, value) in six.iteritems(updated_fsic):
for f in sync_filter:
DatabaseMaxCounter.objects.update_or_create(
instance_id=key, partition=f, defaults={"counter": value}
)

@classmethod
def calculate_filter_specific_instance_counters(cls, filters, is_producer=False):
"""
Returns a dict that maps instance_ids to their respective "high-water level" counters with
respect to the provided list of filter partitions, based on what the local database contains.
First, for each partition in the filter, it calculates the maximum values the database has
received through any filters containing that partition.
Then, it combines these dicts into a single dict, collapsing across the filter partitions.
In Morango 0.6.5 and below, this was always calculated based on the "minimum" values for
each instance_id, and with instance_ids that didn't exist in *each* of the partitions being
excluded entirely. When the producing side had records needing to be sent for an instance
under one of the filter partitions, but not under another, it would not be included in the
FSIC and thereby lead to the data not being sent, as showed up in:
https://github.com/learningequality/kolibri/issues/8439
The solution was to add an asymmetry in how FSICs are calculated, with the sending side
using a "max" instead of a "min" to ensure everything is included, and then the receiving
side still using a "min" (though after it has completed a sync, it updates its counters
such that the min and max should then be equivalent).
One potential issue remains, but it is an edge case that can be worked around:
- We now take the maxes across the filter partitions and use those as the producer FSICs.
- When the receiver finishes integrating the received data, it updates its counters to match.
- If the sender had actually done a sync with just a subset of those filters in the past, it
might not actually have received everything available for the other filters, and hence the
receiver may not be correct in assuming it now has everything up to the levels of the
producer's FSICs (as it does by taking the "max" across the filter partition FSICs).
There are two ways to avoid this:
- Don't sync with differing subsets of the same partitions across multiple syncs. For
example, if you do syncs with filters "AB" and "AC", don't also do syncs with filters
"AC" and "AD". This is the approach that makes this work in Kolibri, for now.
- OR: Don't do syncs with more than one filter partition at a time. Do each one in sequence.
For example, rather than pushing "AB" and "AC" in a single transfer session, do one pull
for AB and then another one for AC. This has the disadvantage of a bit of extra overhead,
but would likely be the most robust option, and the easiest to enforce and reason about.
"""
def get_instance_counters_for_partitions(cls, partitions, is_producer=False):
queryset = cls.objects.filter(partition__in=partitions)

# filter out instance_ids that are no longer relevant
if (
is_producer
): # for the producing side, we only need to use instance_ids that are still on Store records
matching_instances = Store.objects.filter(
partition__startswith=models.OuterRef("partition"),
last_saved_instance=models.OuterRef("instance_id"),
).values("last_saved_instance")
else: # for the receiving side, we include all instances that still exist in the RecordMaxCounters
matching_instances = RecordMaxCounter.objects.filter(
store_model__partition__startswith=models.OuterRef("partition"),
instance_id=models.OuterRef("instance_id"),
).values("instance_id")

# manually add EXISTS clause to avoid Django bug which puts `= TRUE` comparison that kills postgres perf
exists = models.Exists(matching_instances).resolve_expression(
query=queryset.query, allow_joins=True, reuse=None
)
queryset.query.where.add(exists, "AND")

queryset = queryset.values_list("partition", "instance_id", "counter")

counters = {}
for partition, instance_id, counter in queryset:
if partition not in counters:
counters[partition] = {}
counters[partition].update({instance_id: counter})
return counters

@classmethod
def calculate_filter_specific_instance_counters(
cls,
filters,
is_producer=False,
v2_format=False,
):

if v2_format:

queryset = cls.objects.all()

queryset = cls.objects.all()
# get the DMC records with partitions that fall under the filter prefixes
sub_condition = functools.reduce(
lambda x, y: x | y,
[Q(partition__startswith=prefix) for prefix in filters],
)
sub_partitions = set(
queryset.filter(sub_condition)
.values_list("partition", flat=True)
.distinct()
)

per_filter_max = []
# get the DMC records with partitions that are prefixes of the filters
super_partitions = set()
for filt in filters:
qs = (
queryset.annotate(
filter_matches=ValueStartsWithField(filt, "partition")
)
.filter(filter_matches=True)
.exclude(partition=filt)
)
super_partitions.update(
qs.values_list("partition", flat=True).distinct()
)

for filt in filters:
# {filt} LIKE partition || '%'
qs = queryset.annotate(
filter_matches=ValueStartsWithField(filt, "partition")
# get the instance counters for the partitions, also filtering out old unnecessary instance_ids
super_fsics = cls.get_instance_counters_for_partitions(
super_partitions, is_producer=is_producer
)
qs = qs.filter(filter_matches=True)
filt_maxes = qs.values("instance_id").annotate(maxval=Max("counter"))
per_filter_max.append(
{dmc["instance_id"]: dmc["maxval"] for dmc in filt_maxes}
sub_fsics = cls.get_instance_counters_for_partitions(
sub_partitions, is_producer=is_producer
)

instance_id_lists = [maxes.keys() for maxes in per_filter_max]
all_instance_ids = reduce(set.union, instance_id_lists, set())
if is_producer:
# when we're sending, we want to make sure we include everything
result = {
instance_id: max([d.get(instance_id, 0) for d in per_filter_max])
for instance_id in all_instance_ids
raw_fsic = {
"super": super_fsics,
"sub": sub_fsics,
}

# remove instance counters on partitions that are redundant to counters on super partitions
remove_redundant_instance_counters(raw_fsic)

return raw_fsic

else:
# when we're receiving, we don't want to overpromise on what we have
result = {
instance_id: min([d.get(instance_id, 0) for d in per_filter_max])
for instance_id in reduce(
set.intersection, instance_id_lists, all_instance_ids

queryset = cls.objects.all()

per_filter_max = []

for filt in filters:
# {filt} LIKE partition || '%'
qs = queryset.annotate(
filter_matches=ValueStartsWithField(filt, "partition")
)
}
qs = qs.filter(filter_matches=True)
filt_maxes = qs.values("instance_id").annotate(maxval=Max("counter"))
per_filter_max.append(
{dmc["instance_id"]: dmc["maxval"] for dmc in filt_maxes}
)

instance_id_lists = [maxes.keys() for maxes in per_filter_max]
all_instance_ids = reduce(set.union, instance_id_lists, set())
if is_producer:
# when we're sending, we want to make sure we include everything
result = {
instance_id: max([d.get(instance_id, 0) for d in per_filter_max])
for instance_id in all_instance_ids
}
else:
# when we're receiving, we don't want to overpromise on what we have
result = {
instance_id: min([d.get(instance_id, 0) for d in per_filter_max])
for instance_id in reduce(
set.intersection, instance_id_lists, all_instance_ids
)
}

return result

Expand Down
Loading

0 comments on commit ce83e08

Please sign in to comment.