From 71605dbfb39e5433388215ac478a5fcf15d52578 Mon Sep 17 00:00:00 2001 From: Jamie Alexandre Date: Mon, 24 Jan 2022 13:22:34 -0800 Subject: [PATCH 1/8] First pass (WIP) implementation of FSIC format v2 --- .gitignore | 5 +- morango/constants/capabilities.py | 1 + morango/constants/settings.py | 1 + morango/models/core.py | 215 ++++++++----- morango/models/fsic_utils.py | 124 ++++++++ morango/sync/operations.py | 147 +++++++-- morango/utils.py | 4 + tests/testapp/tests/models/test_core.py | 9 +- tests/testapp/tests/models/test_fsic_utils.py | 248 +++++++++++++++ tests/testapp/tests/sync/test_operations.py | 294 ++++++++++++++---- tests/testapp/tests/test_utils.py | 9 + 11 files changed, 890 insertions(+), 167 deletions(-) create mode 100644 morango/models/fsic_utils.py create mode 100644 tests/testapp/tests/models/test_fsic_utils.py diff --git a/.gitignore b/.gitignore index 6e708dab..d40ef0ca 100644 --- a/.gitignore +++ b/.gitignore @@ -95,5 +95,6 @@ Pipfile # Jetbrains IDE .idea -tests/testapp/testapp.db -tests/testapp/testapp2.db +*.db + +.vscode \ No newline at end of file diff --git a/morango/constants/capabilities.py b/morango/constants/capabilities.py index ad2a906f..031e50b9 100644 --- a/morango/constants/capabilities.py +++ b/morango/constants/capabilities.py @@ -1,3 +1,4 @@ GZIP_BUFFER_POST = "GZIP_BUFFER_POST" ALLOW_CERTIFICATE_PUSHING = "ALLOW_CERTIFICATE_PUSHING" ASYNC_OPERATIONS = "ASYNC_OPERATIONS" +FSIC_V2_FORMAT = "FSIC_V2_FORMAT" \ No newline at end of file diff --git a/morango/constants/settings.py b/morango/constants/settings.py index 22b3f70b..c6961ff9 100644 --- a/morango/constants/settings.py +++ b/morango/constants/settings.py @@ -2,6 +2,7 @@ MORANGO_SERIALIZE_BEFORE_QUEUING = True MORANGO_DESERIALIZE_AFTER_DEQUEUING = True MORANGO_DISALLOW_ASYNC_OPERATIONS = False +MORANGO_DISABLE_FSIC_V2_FORMAT = False MORANGO_INSTANCE_INFO = {} MORANGO_INITIALIZE_OPERATIONS = ( "morango.sync.operations:InitializeOperation", diff --git a/morango/models/core.py b/morango/models/core.py index 8fddf0c6..bcf17188 100644 --- a/morango/models/core.py +++ b/morango/models/core.py @@ -1,5 +1,7 @@ from __future__ import unicode_literals +from collections import defaultdict +import functools import json import logging import uuid @@ -12,6 +14,7 @@ from django.db.models import F from django.db.models import Func from django.db.models import Max +from django.db.models import Q from django.db.models import TextField from django.db.models import Value from django.db.models.deletion import Collector @@ -32,6 +35,7 @@ from morango.models.fields.uuids import sha2_uuid from morango.models.fields.uuids import UUIDField from morango.models.fields.uuids import UUIDModelMixin +from morango.models.fsic_utils import remove_redundant_instance_counters from morango.models.manager import SyncableModelManager from morango.models.morango_mptt import MorangoMPTTModel from morango.models.utils import get_0_4_system_parameters @@ -288,10 +292,16 @@ class TransferSession(models.Model): # stages and stage status of transfer session transfer_stage = models.CharField( - max_length=20, choices=transfer_stages.CHOICES, blank=True, null=True, + max_length=20, + choices=transfer_stages.CHOICES, + blank=True, + null=True, ) transfer_stage_status = models.CharField( - max_length=20, choices=transfer_statuses.CHOICES, blank=True, null=True, + max_length=20, + choices=transfer_statuses.CHOICES, + blank=True, + null=True, ) @property @@ -572,97 +582,148 @@ class Meta: @classmethod @transaction.atomic - def update_fsics(cls, fsics, sync_filter): + def update_fsics(cls, fsics, sync_filter, v2_format=True): internal_fsic = DatabaseMaxCounter.calculate_filter_specific_instance_counters( - sync_filter + sync_filter, v2_format=v2_format ) - updated_fsic = {} - for key, value in six.iteritems(fsics): - if key in internal_fsic: - # if same instance id, update fsic with larger value - if fsics[key] > internal_fsic[key]: + if v2_format: + internal_fsic = internal_fsic["sub"] + fsics = fsics["sub"] + for part, insts in fsics.items(): + for inst, counter in insts.items(): + if internal_fsic.get("part", {}).get(inst, 0) < counter: + DatabaseMaxCounter.objects.update_or_create( + instance_id=inst, partition=part, defaults={"counter": counter} + ) + else: + updated_fsic = {} + for key, value in six.iteritems(fsics): + if key in internal_fsic: + # if same instance id, update fsic with larger value + if fsics[key] > internal_fsic[key]: + updated_fsic[key] = fsics[key] + else: + # if instance id is not present, add it to updated fsics updated_fsic[key] = fsics[key] - else: - # if instance id is not present, add it to updated fsics - updated_fsic[key] = fsics[key] - - # load database max counters - for (key, value) in six.iteritems(updated_fsic): - for f in sync_filter: - DatabaseMaxCounter.objects.update_or_create( - instance_id=key, partition=f, defaults={"counter": value} - ) + + # load database max counters + for (key, value) in six.iteritems(updated_fsic): + for f in sync_filter: + DatabaseMaxCounter.objects.update_or_create( + instance_id=key, partition=f, defaults={"counter": value} + ) @classmethod - def calculate_filter_specific_instance_counters(cls, filters, is_producer=False): + def get_instance_counters_for_partition(cls, part, only_if_in_store=False): """ - Returns a dict that maps instance_ids to their respective "high-water level" counters with - respect to the provided list of filter partitions, based on what the local database contains. - - First, for each partition in the filter, it calculates the maximum values the database has - received through any filters containing that partition. - - Then, it combines these dicts into a single dict, collapsing across the filter partitions. - In Morango 0.6.5 and below, this was always calculated based on the "minimum" values for - each instance_id, and with instance_ids that didn't exist in *each* of the partitions being - excluded entirely. When the producing side had records needing to be sent for an instance - under one of the filter partitions, but not under another, it would not be included in the - FSIC and thereby lead to the data not being sent, as showed up in: - https://github.com/learningequality/kolibri/issues/8439 - - The solution was to add an asymmetry in how FSICs are calculated, with the sending side - using a "max" instead of a "min" to ensure everything is included, and then the receiving - side still using a "min" (though after it has completed a sync, it updates its counters - such that the min and max should then be equivalent). - - One potential issue remains, but it is an edge case that can be worked around: - - We now take the maxes across the filter partitions and use those as the producer FSICs. - - When the receiver finishes integrating the received data, it updates its counters to match. - - If the sender had actually done a sync with just a subset of those filters in the past, it - might not actually have received everything available for the other filters, and hence the - receiver may not be correct in assuming it now has everything up to the levels of the - producer's FSICs (as it does by taking the "max" across the filter partition FSICs). - There are two ways to avoid this: - - Don't sync with differing subsets of the same partitions across multiple syncs. For - example, if you do syncs with filters "AB" and "AC", don't also do syncs with filters - "AC" and "AD". This is the approach that makes this work in Kolibri, for now. - - OR: Don't do syncs with more than one filter partition at a time. Do each one in sequence. - For example, rather than pushing "AB" and "AC" in a single transfer session, do one pull - for AB and then another one for AC. This has the disadvantage of a bit of extra overhead, - but would likely be the most robust option, and the easiest to enforce and reason about. + Return a dict of {instance_id: counter} for DMC's with the given partition. + If only_if_in_store is True, only return DMCs whose instance_id is still in the Store. """ + queryset = cls.objects.filter(partition=part) + if only_if_in_store: + matching_store_instances = ( + Store.objects.filter(partition__startswith=part) + .values_list("last_saved_instance") + .distinct() + ) + queryset = queryset.filter(instance_id__in=matching_store_instances) + return dict(queryset.values_list("instance_id", "counter")) - queryset = cls.objects.all() + @classmethod + def calculate_filter_specific_instance_counters( + cls, + filters, + is_producer=False, + v2_format=True, + only_if_in_store=False, + remove_redundant=True, + ): - per_filter_max = [] + if v2_format: - for filt in filters: - # {filt} LIKE partition || '%' - qs = queryset.annotate( - filter_matches=ValueStartsWithField(filt, "partition") + queryset = cls.objects.all() + + # get the DMC records with partitions that fall under the filter prefixes + sub_condition = functools.reduce( + lambda x, y: x | y, + [Q(partition__startswith=prefix) for prefix in filters], ) - qs = qs.filter(filter_matches=True) - filt_maxes = qs.values("instance_id").annotate(maxval=Max("counter")) - per_filter_max.append( - {dmc["instance_id"]: dmc["maxval"] for dmc in filt_maxes} + sub_partitions = set( + queryset.filter(sub_condition) + .values_list("partition", flat=True) + .distinct() ) - instance_id_lists = [maxes.keys() for maxes in per_filter_max] - all_instance_ids = reduce(set.union, instance_id_lists, set()) - if is_producer: - # when we're sending, we want to make sure we include everything - result = { - instance_id: max([d.get(instance_id, 0) for d in per_filter_max]) - for instance_id in all_instance_ids + # get the DMC records with partitions that are prefixes of the filters + super_partitions = set() + for filt in filters: + qs = ( + queryset.annotate( + filter_matches=ValueStartsWithField(filt, "partition") + ) + .filter(filter_matches=True) + .exclude(partition=filt) + ) + super_partitions.update( + qs.values_list("partition", flat=True).distinct() + ) + + # get the instance counters for the partitions, filtered (if requested) by instance_ids still used in the store + super_fsics = {} + for part in super_partitions: + super_fsics[part] = cls.get_instance_counters_for_partition( + part, only_if_in_store=only_if_in_store + ) + sub_fsics = {} + for part in sub_partitions: + sub_fsics[part] = cls.get_instance_counters_for_partition( + part, only_if_in_store=only_if_in_store + ) + + raw_fsic = { + "super": super_fsics, + "sub": sub_fsics, } + + # if requested, remove instance counters on partitions that are redudant to counters on super partitions + if remove_redundant: + remove_redundant_instance_counters(raw_fsic) + + return raw_fsic + else: - # when we're receiving, we don't want to overpromise on what we have - result = { - instance_id: min([d.get(instance_id, 0) for d in per_filter_max]) - for instance_id in reduce( - set.intersection, instance_id_lists, all_instance_ids + + queryset = cls.objects.all() + + per_filter_max = [] + + for filt in filters: + # {filt} LIKE partition || '%' + qs = queryset.annotate( + filter_matches=ValueStartsWithField(filt, "partition") ) - } + qs = qs.filter(filter_matches=True) + filt_maxes = qs.values("instance_id").annotate(maxval=Max("counter")) + per_filter_max.append( + {dmc["instance_id"]: dmc["maxval"] for dmc in filt_maxes} + ) + + instance_id_lists = [maxes.keys() for maxes in per_filter_max] + all_instance_ids = reduce(set.union, instance_id_lists, set()) + if is_producer: + # when we're sending, we want to make sure we include everything + result = { + instance_id: max([d.get(instance_id, 0) for d in per_filter_max]) + for instance_id in all_instance_ids + } + else: + # when we're receiving, we don't want to overpromise on what we have + result = { + instance_id: min([d.get(instance_id, 0) for d in per_filter_max]) + for instance_id in reduce( + set.intersection, instance_id_lists, all_instance_ids + ) + } return result diff --git a/morango/models/fsic_utils.py b/morango/models/fsic_utils.py new file mode 100644 index 00000000..4cf578a3 --- /dev/null +++ b/morango/models/fsic_utils.py @@ -0,0 +1,124 @@ +from collections import defaultdict + + +def _build_prefix_mapper(keys, include_self=False): + """ + Returns a dict mapping each key to a list of keys that are its prefixes. + """ + prefix_mapper = defaultdict(list) + for key in keys: + for otherkey in keys: + if key.startswith(otherkey) and (include_self or key != otherkey): + prefix_mapper[key].append(otherkey) + return prefix_mapper + + +def _get_sub_partitions(partitions): + """ + Return a set of partitions that are sub-partitions of other partitions in the list. + """ + sub_partitions = set() + for partition in partitions: + for other_partition in partitions: + if partition.startswith(other_partition) and partition != other_partition: + sub_partitions.add(partition) + return set(sub_partitions) + + +def _merge_fsic_dicts(*dicts): + """ + Merge a list of dicts into a single dict. + """ + merged = {} + for d in dicts: + merged.update(d) + return merged + + +def remove_redundant_instance_counters(raw_fsic): + """ + Given a raw fsic dict with "sub" and "super" dicts of the form {partition: {instance_id: counter}}, remove any {instance_id: counter} + entries for which there are greater or equal counter values for the same instance under a partition that is a prefix of that partition. + Note: we leave empty dicts under a partition because that's needed to tell downstream functions that there is data for this partition. + """ + assert "super" in raw_fsic + assert "sub" in raw_fsic + fsic_dicts = [raw_fsic["super"], raw_fsic["sub"]] + # build a combined dict from the provided fsic_dicts for easy querying + merged_dict = _merge_fsic_dicts(*fsic_dicts) + # map out the prefixes of each partition + prefix_mapper = _build_prefix_mapper(merged_dict.keys()) + # loop through fsic_dicts and remove entries for which a superpartition has equal or higher counter for same instance + for fsic_dict in fsic_dicts: + for part, sub_dict in list(fsic_dict.items()): + for superpart in prefix_mapper[part]: + super_dict = merged_dict[superpart] + for inst, counter in super_dict.items(): + if inst in sub_dict and sub_dict[inst] <= counter: + del sub_dict[inst] + + +def expand_fsic_for_use(raw_fsic): + """ + Convert the raw FSIC format from the wire into a format usable for filtering, by propagating super partition counts + down into sub-partitions. Returns only the expanded subpartition dict, discarding the super partitions. + """ + assert "super" in raw_fsic + assert "sub" in raw_fsic + raw_fsic = raw_fsic.copy() + subordinates = _get_sub_partitions(raw_fsic["sub"].keys()) + for sub_part, sub_fsic in raw_fsic["sub"].items(): + # skip any partitions that are subordinate to another sub-partition + if sub_part in subordinates: + continue + # look through the super partitions for any that are prefixes of this partition + for super_part, super_fsic in raw_fsic["super"].items(): + if sub_part.startswith(super_part): + # update the sub-partition's counters with any higher counters from the super-partition + for instance, counter in super_fsic.items(): + if counter > sub_fsic.get(instance, 0): + sub_fsic[instance] = counter + return raw_fsic["sub"] + + +def calculate_directional_fsic_diff(fsic1, fsic2): + """ + Calculate the (instance_id, counter) pairs that are the lower-bound levels for sending data from the + device with fsic1 to the device with fsic2. + + :param fsic1: dict containing (instance_id, counter) pairs for the sending device + :param fsic2: dict containing (instance_id, counter) pairs for the receiving device + :return ``dict`` of fsics to be used in queueing the correct records to the buffer + """ + return { + instance: fsic2.get(instance, 0) + for instance, counter in fsic1.items() + if fsic2.get(instance, 0) < counter + } + + +def calculate_directional_fsic_diff_v2(fsic1, fsic2): + """ + Calculate the (instance_id, counter) pairs that are the lower-bound levels for sending data from the + device with fsic1 to the device with fsic2. + + FSIC v2 expanded format: {partition: {instance_id: counter}} + + :param fsic1: dict containing FSIC v2 in expanded format, for the sending device + :param fsic2: dict containing FSIC v2 in expanded format, for the receiving device + :return ``dict`` in expanded FSIC v2 format to be used in queueing the correct records to the buffer + """ + prefixes = _build_prefix_mapper(list(fsic1.keys()) + list(fsic2.keys()), include_self=True) + + result = defaultdict(dict) + + # look at all the partitions in the sending FSIC + for part, insts in fsic1.items(): + # check for counters in the sending FSIC that are higher than the receiving FSIC + for inst, sending_counter in insts.items(): + # get the maximum counter in the receiving FSIC for the same instance + receiving_counter = max(fsic2.get(prefix, {}).get(inst, 0) for prefix in prefixes[part]) + if receiving_counter < sending_counter: + result[part][inst] = receiving_counter + + return dict(result) \ No newline at end of file diff --git a/morango/sync/operations.py b/morango/sync/operations.py index 14dcc41d..829d7866 100644 --- a/morango/sync/operations.py +++ b/morango/sync/operations.py @@ -1,4 +1,6 @@ +from curses import raw import functools +import itertools import json import logging import uuid @@ -11,7 +13,6 @@ from django.db import transaction from django.db.models import Q from django.db.models import signals -from django.utils import six from django.utils import timezone from rest_framework.exceptions import ValidationError @@ -31,6 +32,11 @@ from morango.models.core import RecordMaxCounterBuffer from morango.models.core import Store from morango.models.core import TransferSession +from morango.models.fsic_utils import ( + calculate_directional_fsic_diff, + calculate_directional_fsic_diff_v2, + expand_fsic_for_use, +) from morango.registry import syncable_models from morango.sync.backends.utils import load_backend from morango.sync.context import LocalSessionContext @@ -90,22 +96,6 @@ def _self_referential_fk(model): return None -def _fsic_queuing_calc(fsic1, fsic2): - """ - We set the lower counter between two same instance ids. - If an instance_id exists in one fsic but not the other we want to give that counter a value of 0. - - :param fsic1: dictionary containing (instance_id, counter) pairs - :param fsic2: dictionary containing (instance_id, counter) pairs - :return ``dict`` of fsics to be used in queueing the correct records to the buffer - """ - return { - instance: fsic2.get(instance, 0) - for instance, counter in six.iteritems(fsic1) - if fsic2.get(instance, 0) < counter - } - - def _serialize_into_store(profile, filter=None): """ Takes data from app layer and serializes the models into the store. @@ -420,10 +410,13 @@ def _queue_into_buffer(transfersession): server_fsic = json.loads(transfersession.server_fsic) client_fsic = json.loads(transfersession.client_fsic) + if "sub" in server_fsic: + return _queue_into_buffer_v2(transfersession) + if transfersession.push: - fsics = _fsic_queuing_calc(client_fsic, server_fsic) + fsics = calculate_directional_fsic_diff(client_fsic, server_fsic) else: - fsics = _fsic_queuing_calc(server_fsic, client_fsic) + fsics = calculate_directional_fsic_diff(server_fsic, client_fsic) # if fsics are identical or receiving end has newer data, then there is nothing to queue if not fsics: @@ -531,6 +524,118 @@ def _queue_into_buffer(transfersession): ) +@transaction.atomic(using=USING_DB) +def _queue_into_buffer_v2(transfersession): + """ + Takes a chunk of data from the store to be put into the buffer to be sent to another morango instance. + + ALGORITHM: We do Filter Specific Instance Counter arithmetic to get our newest data compared to the server's older data. + We use raw sql queries to place data in the buffer and the record max counter buffer, which matches the conditions of the FSIC. + + This version uses the new FSIC format that is split out by partition, divided into sub partitions (the ones under the filter) + and super partitions (prefixes of the sub partitions). + """ + filter_prefixes = Filter(transfersession.filter) + server_fsic = json.loads(transfersession.server_fsic) + client_fsic = json.loads(transfersession.client_fsic) + + assert "sub" in server_fsic + assert "super" in server_fsic + assert "sub" in client_fsic + assert "super" in client_fsic + + server_fsic = expand_fsic_for_use(server_fsic) + client_fsic = expand_fsic_for_use(client_fsic) + + if transfersession.push: + fsics = calculate_directional_fsic_diff_v2(client_fsic, server_fsic) + else: + fsics = calculate_directional_fsic_diff_v2(server_fsic, client_fsic) + + # if fsics are identical or receiving end has newer data, then there is nothing to queue + if not fsics: + return + + profile_condition = ["profile = '{}'".format(transfersession.sync_session.profile)] + + # create condition for filtering by partitions + partition_conditions = [] + for part, insts in fsics.items(): + if insts: + partition_conditions.append( + "partition LIKE '{}%' AND (".format(part) + + _join_with_logical_operator( + [ + "(last_saved_instance = '{}' AND last_saved_counter > {})".format( + inst, counter + ) + for inst, counter in insts.items() + ], + "OR", + ) + + ")" + ) + partition_conditions = [_join_with_logical_operator(partition_conditions, "OR")] + + # combine conditions and filter by profile + where_condition = _join_with_logical_operator( + profile_condition + partition_conditions, "AND" + ) + + select_buffers = [] + select_rmc_buffers = [] + + # execute raw sql to take all records that match condition, to be put into buffer for transfer + select_buffers.append( + """SELECT DISTINCT + id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, + partition, source_id, conflicting_serialized_data, + CAST ('{transfer_session_id}' AS {transfer_session_id_type}), _self_ref_fk + FROM {store} WHERE {condition} + """.format( + transfer_session_id=transfersession.id, + transfer_session_id_type=TransferSession._meta.pk.rel_db_type(connection), + condition=where_condition, + store=Store._meta.db_table, + ) + ) + # take all record max counters that are foreign keyed onto store models, which were queued into the buffer + select_rmc_buffers.append( + """SELECT instance_id, counter, CAST ('{transfer_session_id}' AS {transfer_session_id_type}), store_model_id + FROM {record_max_counter} AS rmc + INNER JOIN {outgoing_buffer} AS buffer ON rmc.store_model_id = buffer.model_uuid + WHERE buffer.transfer_session_id = '{transfer_session_id}' + """.format( + transfer_session_id=transfersession.id, + transfer_session_id_type=TransferSession._meta.pk.rel_db_type(connection), + record_max_counter=RecordMaxCounter._meta.db_table, + outgoing_buffer=Buffer._meta.db_table, + ) + ) + + with connection.cursor() as cursor: + cursor.execute( + """INSERT INTO {outgoing_buffer} + (model_uuid, serialized, deleted, last_saved_instance, last_saved_counter, + hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data, + transfer_session_id, _self_ref_fk) + {select} + """.format( + outgoing_buffer=Buffer._meta.db_table, + select=" UNION ".join(select_buffers), + ) + ) + cursor.execute( + """INSERT INTO {outgoing_rmcb} + (instance_id, counter, transfer_session_id, model_uuid) + {select} + """.format( + outgoing_rmcb=RecordMaxCounterBuffer._meta.db_table, + select=" UNION ".join(select_rmc_buffers), + ) + ) + + @transaction.atomic(using=USING_DB) def _dequeue_into_store(transfersession): """ @@ -1141,7 +1246,9 @@ def handle(self, context): # workflow we need to update the network server when pushing to say how many records we've queued. For pull, # we handle that in the initialization/creation of the transfer session, since that's when it's first available. if context.is_push: - self.update_transfer_session(context, records_total=context.transfer_session.records_total) + self.update_transfer_session( + context, records_total=context.transfer_session.records_total + ) return transfer_statuses.COMPLETED diff --git a/morango/utils.py b/morango/utils.py index d53e47ec..27550dfd 100644 --- a/morango/utils.py +++ b/morango/utils.py @@ -8,6 +8,7 @@ from morango.constants.capabilities import ALLOW_CERTIFICATE_PUSHING from morango.constants.capabilities import GZIP_BUFFER_POST from morango.constants.capabilities import ASYNC_OPERATIONS +from morango.constants.capabilities import FSIC_V2_FORMAT def do_import(import_string): @@ -52,6 +53,9 @@ def get_capabilities(): if SETTINGS.ALLOW_CERTIFICATE_PUSHING: capabilities.add(ALLOW_CERTIFICATE_PUSHING) + if not SETTINGS.MORANGO_DISABLE_FSIC_V2_FORMAT: + capabilities.add(FSIC_V2_FORMAT) + # Middleware async operation capabilities are standard in 0.6.0 and above if not SETTINGS.MORANGO_DISALLOW_ASYNC_OPERATIONS: capabilities.add(ASYNC_OPERATIONS) diff --git a/tests/testapp/tests/models/test_core.py b/tests/testapp/tests/models/test_core.py index bc470216..264a23bc 100644 --- a/tests/testapp/tests/models/test_core.py +++ b/tests/testapp/tests/models/test_core.py @@ -1,9 +1,9 @@ import factory import uuid from django.test import TestCase +from django.test import override_settings from django.utils import timezone from django.utils.six import iteritems - from facility_profile.models import MyUser from morango.constants import transfer_stages @@ -20,8 +20,8 @@ class DatabaseMaxCounterFactory(factory.DjangoModelFactory): class Meta: model = DatabaseMaxCounter - -class FilterMaxCounterTestCase(TestCase): +@override_settings(MORANGO_DISABLE_FSIC_V2_FORMAT=True) +class OldFilterMaxCounterTestCase(TestCase): def setUp(self): self.instance_a = "a" * 32 self.prefix_a = "AAA" @@ -103,7 +103,8 @@ def test_producer_vs_receiver_fsics(self): self.assertEqual(fsic_receiver.get(self.instance_b, 0), 10) -class DatabaseMaxCounterUpdateCalculation(TestCase): +@override_settings(MORANGO_DISABLE_FSIC_V2_FORMAT=True) +class OldDatabaseMaxCounterUpdateCalculation(TestCase): def setUp(self): self.filter = "filter" diff --git a/tests/testapp/tests/models/test_fsic_utils.py b/tests/testapp/tests/models/test_fsic_utils.py new file mode 100644 index 00000000..be825cf5 --- /dev/null +++ b/tests/testapp/tests/models/test_fsic_utils.py @@ -0,0 +1,248 @@ +from morango.models.fsic_utils import expand_fsic_for_use +from morango.models.fsic_utils import remove_redundant_instance_counters +from morango.models.fsic_utils import calculate_directional_fsic_diff_v2 + +from django.test import TestCase + +class TestFSICUtils(TestCase): + + def test_expand_fsic_for_use(self): + source_fsic = { + "super": { + "p": { + "a": 5, + "b": 3, + "c": 7, + }, + }, + "sub": { + "p1": { + "a": 1, + "b": 9, + "d": 2, + }, + "p1i": { + "e": 5, + }, + "p2i": { + "e": 5, + }, + }, + } + expected_fsic = { + "p1": { + "a": 5, # from super, because it was larger + "b": 9, # from sub, because it was larger + "c": 7, # from super, because it didn't exist in sub + "d": 2, # from sub, because it didn't exist in super + }, + "p1i": { # only instance here, because others from super were covered by p1 + "e": 5, + }, + "p2i": { # but no prefix in sub for this one, so it does inherit from super + "a": 5, + "b": 3, + "c": 7, + "e": 5, + }, + } + self.assertEqual(expand_fsic_for_use(source_fsic), expected_fsic) + + def test_remove_redundant_instance_counters(self): + source_fsic = { + "super": { + "p": { + "a": 5, + "b": 3, + "c": 7, + }, + "p3": { + "a": 1, # will be stripped out, because lower than p's counter + "c": 8, # will be kept, because higher than p's counter + "d": 14, # will be kept, as it's not in p's counter + }, + }, + "sub": { + "p1": { + "a": 5, # will be stripped out, because lower than p's counter + "b": 9, # will be kept, because higher than p's counter + "c": 7, # will be stripped out, because same as p's counter + "d": 2, # will be kept, because it's not in p's counter + }, + "p1i": { + "e": 5, # will be kept, because it's not in p's counter + "c": 4, # will be stripped out, because lower than p's and p1's counter + "d": 1, # will be stripped out, because lower than p1's counter + }, + "p1j": { # will be an empty dict, because these counters are all <= p's counters + "b": 3, + "c": 5, + }, + "p2i": { + "a": 5, # will be stripped out, because same as p's counter + "e": 5, # will be kept, because it's not in p's counter + }, + "p3i": { + "a": 8, # will be kept, because it's higher than p's and p3's counter + "c": 5, # will be stripped out, because lower than p's and p3's counter + "d": 2, # will be stripped out, because lower than p3's counter + }, + } + } + expected_fsic = { + "super": { + "p": { + "a": 5, + "b": 3, + "c": 7, + }, + "p3": { + "c": 8, + "d": 14, + }, + }, + "sub": { + "p1": { + "b": 9, + "d": 2, + }, + "p1i": { + "e": 5, + }, + "p1j": {}, + "p2i": { + "e": 5, + }, + "p3i": { + "a": 8, + }, + } + } + remove_redundant_instance_counters(source_fsic) + self.assertEqual(source_fsic, expected_fsic) + + def test_calculate_directional_fsic_diff_v2(self): + sending_fsic = { + "p": { + "a": 5, + "c": 7, + }, + "p1": { + "b": 9, + "d": 2, + }, + "p1i": { + "a": 7, + "e": 6, + "f": 1, + }, + "p2": { + "a": 8, + "q": 5, + }, + } + receiving_fsic = { + "p": { + "a": 3, # will be included, because it's lower than sender + "b": 4, # won't be included, because it doesn't exist in sender + "c": 9, # won't be included, because it's higher than sender + }, + "p1": { # will be excluded, because it's the same as in sender + "b": 9, + "d": 2, + }, + "p1i": { + "a": 6, # will be included, because it's lower than sender + "e": 5, # will be included, because it's lower than sender + "c": 9, # won't be included, because it's higher than in p in sender + # "f" # will be included at 0, as it doesn't exist in receiver + }, + # "p2": { + # "a" # will be included at 3, because it's in prefix partition p in receiver + # "q" # will be included at 0, because it's not in the receiver + # }, + "p3": { # will be excluded, because it's not in the source + "a": 2, + "c": 3, + } + } + expected_diff = { + "p": { + "a": 3, + }, + "p1i": { + "a": 6, + "e": 5, + "f": 0, + }, + "p2": { + "a": 3, + "q": 0, + }, + } + self.assertEqual(calculate_directional_fsic_diff_v2(sending_fsic, receiving_fsic), expected_diff) + + def test_calculate_directional_fsic_diff_v2_identical(self): + sending_fsic = receiving_fsic = { + "p": { + "a": 5, + "c": 7, + }, + "p1": { + "b": 9, + "d": 2, + }, + "p1i": { + "a": 7, + "e": 6, + "f": 1, + }, + "p2": { + "a": 8, + "q": 5, + }, + } + expected_diff = {} + self.assertEqual(calculate_directional_fsic_diff_v2(sending_fsic, receiving_fsic), expected_diff) + + def test_calculate_directional_fsic_diff_v2_receiver_is_higher(self): + sending_fsic = { + "p": { + "a": 5, + "c": 7, + }, + "p1": { + "b": 9, + "d": 2, + }, + "p1i": { + "a": 7, + "e": 6, + "f": 1, + }, + "p2": { + "a": 8, + "q": 5, + }, + } + receiving_fsic = { + "p": { + "a": 6, + "c": 9, + }, + "p1": { + "b": 11, + "d": 12, + }, + "p1i": { + "a": 71, + "e": 16, + "f": 21, + }, + "p2": { + "a": 48, + "q": 51, + }, + } + expected_diff = {} + self.assertEqual(calculate_directional_fsic_diff_v2(sending_fsic, receiving_fsic), expected_diff) \ No newline at end of file diff --git a/tests/testapp/tests/sync/test_operations.py b/tests/testapp/tests/sync/test_operations.py index fb0ad477..fbdab743 100644 --- a/tests/testapp/tests/sync/test_operations.py +++ b/tests/testapp/tests/sync/test_operations.py @@ -13,6 +13,7 @@ from ..helpers import create_buffer_and_store_dummy_data from ..helpers import create_dummy_store_data from morango.constants import transfer_statuses +from morango.constants.capabilities import FSIC_V2_FORMAT from morango.errors import MorangoLimitExceeded from morango.models.core import Buffer from morango.models.core import DatabaseIDModel @@ -21,6 +22,7 @@ from morango.models.core import RecordMaxCounterBuffer from morango.models.core import Store from morango.models.core import SyncSession +from morango.models.core import DatabaseMaxCounter from morango.models.core import TransferSession from morango.sync.backends.utils import load_backend from morango.sync.context import LocalSessionContext @@ -47,6 +49,24 @@ class Meta: name = factory.Sequence(lambda n: "Fac %d" % n) +def assertRecordsBuffered(records): + buffer_ids = Buffer.objects.values_list("model_uuid", flat=True) + rmcb_ids = RecordMaxCounterBuffer.objects.values_list("model_uuid", flat=True) + # ensure all store and buffer records are buffered + for i in records: + assert i.id in buffer_ids + assert i.id in rmcb_ids + + +def assertRecordsNotBuffered(records): + buffer_ids = Buffer.objects.values_list("model_uuid", flat=True) + rmcb_ids = RecordMaxCounterBuffer.objects.values_list("model_uuid", flat=True) + # ensure store and buffer records are not buffered + for i in records: + assert i.id not in buffer_ids + assert i.id not in rmcb_ids + + @override_settings(MORANGO_SERIALIZE_BEFORE_QUEUING=False) class QueueStoreIntoBufferTestCase(TestCase): def setUp(self): @@ -63,46 +83,27 @@ def setUp(self): is_server=False, ) - def assertRecordsBuffered(self, records): - buffer_ids = Buffer.objects.values_list("model_uuid", flat=True) - rmcb_ids = RecordMaxCounterBuffer.objects.values_list("model_uuid", flat=True) - # ensure all store and buffer records are buffered - for i in records: - self.assertIn(i.id, buffer_ids) - self.assertIn(i.id, rmcb_ids) - - def assertRecordsNotBuffered(self, records): - buffer_ids = Buffer.objects.values_list("model_uuid", flat=True) - rmcb_ids = RecordMaxCounterBuffer.objects.values_list("model_uuid", flat=True) - # ensure all store and buffer records are buffered - for i in records: - self.assertNotIn(i.id, buffer_ids) - self.assertNotIn(i.id, rmcb_ids) - def test_all_fsics(self): fsics = {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1} self.transfer_session.client_fsic = json.dumps(fsics) _queue_into_buffer(self.transfer_session) # ensure all store and buffer records are buffered - self.assertRecordsBuffered(self.data["group1_c1"]) - self.assertRecordsBuffered(self.data["group1_c2"]) - self.assertRecordsBuffered(self.data["group2_c1"]) + assertRecordsBuffered(self.data["group1_c1"]) + assertRecordsBuffered(self.data["group1_c2"]) + assertRecordsBuffered(self.data["group2_c1"]) def test_very_many_fsics(self): """ Regression test against 'Expression tree is too large (maximum depth 1000)' error with many fsics """ fsics = {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1} - fsics.update({ - uuid.uuid4().hex: i - for i in range(20000) - }) + fsics.update({uuid.uuid4().hex: i for i in range(20000)}) self.transfer_session.client_fsic = json.dumps(fsics) _queue_into_buffer(self.transfer_session) # ensure all store and buffer records are buffered - self.assertRecordsBuffered(self.data["group1_c1"]) - self.assertRecordsBuffered(self.data["group1_c2"]) - self.assertRecordsBuffered(self.data["group2_c1"]) + assertRecordsBuffered(self.data["group1_c1"]) + assertRecordsBuffered(self.data["group1_c2"]) + assertRecordsBuffered(self.data["group2_c1"]) @pytest.mark.skip("Takes 30+ seconds, manual run only") def test_very_very_many_fsics(self): @@ -111,23 +112,17 @@ def test_very_very_many_fsics(self): Maximum supported value: 99,999 """ fsics = {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1} - fsics.update({ - uuid.uuid4().hex: i - for i in range(99999) - }) + fsics.update({uuid.uuid4().hex: i for i in range(99999)}) self.transfer_session.client_fsic = json.dumps(fsics) _queue_into_buffer(self.transfer_session) # ensure all store and buffer records are buffered - self.assertRecordsBuffered(self.data["group1_c1"]) - self.assertRecordsBuffered(self.data["group1_c2"]) - self.assertRecordsBuffered(self.data["group2_c1"]) + assertRecordsBuffered(self.data["group1_c1"]) + assertRecordsBuffered(self.data["group1_c2"]) + assertRecordsBuffered(self.data["group2_c1"]) def test_too_many_fsics(self): fsics = {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1} - fsics.update({ - uuid.uuid4().hex: i - for i in range(100000) - }) + fsics.update({uuid.uuid4().hex: i for i in range(100000)}) self.transfer_session.client_fsic = json.dumps(fsics) with self.assertRaises(MorangoLimitExceeded): _queue_into_buffer(self.transfer_session) @@ -137,9 +132,9 @@ def test_fsic_specific_id(self): self.transfer_session.client_fsic = json.dumps(fsics) _queue_into_buffer(self.transfer_session) # ensure only records modified with 2nd instance id are buffered - self.assertRecordsNotBuffered(self.data["group1_c1"]) - self.assertRecordsNotBuffered(self.data["group1_c2"]) - self.assertRecordsBuffered(self.data["group2_c1"]) + assertRecordsNotBuffered(self.data["group1_c1"]) + assertRecordsNotBuffered(self.data["group1_c2"]) + assertRecordsBuffered(self.data["group2_c1"]) def test_fsic_counters(self): counter = InstanceIDModel.objects.get(id=self.data["group1_id"].id).counter @@ -149,9 +144,9 @@ def test_fsic_counters(self): self.transfer_session.server_fsic = json.dumps(fsics) _queue_into_buffer(self.transfer_session) # ensure only records with updated 1st instance id are buffered - self.assertRecordsBuffered(self.data["group1_c1"]) - self.assertRecordsBuffered(self.data["group1_c2"]) - self.assertRecordsNotBuffered(self.data["group2_c1"]) + assertRecordsBuffered(self.data["group1_c1"]) + assertRecordsBuffered(self.data["group1_c2"]) + assertRecordsNotBuffered(self.data["group2_c1"]) def test_fsic_counters_too_high(self): fsics = {self.data["group1_id"].id: 100, self.data["group2_id"].id: 100} @@ -171,9 +166,9 @@ def test_partition_filter_buffering(self): self.transfer_session.client_fsic = json.dumps(fsics) _queue_into_buffer(self.transfer_session) # ensure records with different partition values are buffered - self.assertRecordsNotBuffered([self.data["user2"]]) - self.assertRecordsBuffered(self.data["user3_sumlogs"]) - self.assertRecordsBuffered(self.data["user3_interlogs"]) + assertRecordsNotBuffered([self.data["user2"]]) + assertRecordsBuffered(self.data["user3_sumlogs"]) + assertRecordsBuffered(self.data["user3_interlogs"]) def test_partition_prefix_buffering(self): fsics = {self.data["group2_id"].id: 1} @@ -182,10 +177,10 @@ def test_partition_prefix_buffering(self): self.transfer_session.client_fsic = json.dumps(fsics) _queue_into_buffer(self.transfer_session) # ensure only records with user2 partition are buffered - self.assertRecordsBuffered([self.data["user2"]]) - self.assertRecordsBuffered(self.data["user2_sumlogs"]) - self.assertRecordsBuffered(self.data["user2_interlogs"]) - self.assertRecordsNotBuffered([self.data["user3"]]) + assertRecordsBuffered([self.data["user2"]]) + assertRecordsBuffered(self.data["user2_sumlogs"]) + assertRecordsBuffered(self.data["user2_interlogs"]) + assertRecordsNotBuffered([self.data["user3"]]) def test_partition_and_fsic_buffering(self): filter_prefixes = "{}:user:summary".format(self.data["user1"].id) @@ -194,9 +189,9 @@ def test_partition_and_fsic_buffering(self): self.transfer_session.client_fsic = json.dumps(fsics) _queue_into_buffer(self.transfer_session) # ensure records updated with 1st instance id and summarylog partition are buffered - self.assertRecordsBuffered(self.data["user1_sumlogs"]) - self.assertRecordsNotBuffered(self.data["user2_sumlogs"]) - self.assertRecordsNotBuffered(self.data["user3_sumlogs"]) + assertRecordsBuffered(self.data["user1_sumlogs"]) + assertRecordsNotBuffered(self.data["user2_sumlogs"]) + assertRecordsNotBuffered(self.data["user3_sumlogs"]) def test_valid_fsic_but_invalid_partition(self): filter_prefixes = "{}:user:summary".format(self.data["user1"].id) @@ -205,7 +200,7 @@ def test_valid_fsic_but_invalid_partition(self): self.transfer_session.client_fsic = json.dumps(fsics) _queue_into_buffer(self.transfer_session) # ensure that record with valid fsic but invalid partition is not buffered - self.assertRecordsNotBuffered([self.data["user4"]]) + assertRecordsNotBuffered([self.data["user4"]]) def test_local_initialize_operation__server(self): self.transfer_session.active = False @@ -223,7 +218,9 @@ def test_local_initialize_operation__server(self): operation = InitializeOperation() self.assertEqual(transfer_statuses.COMPLETED, operation.handle(self.context)) self.context.update.assert_called_once() - transfer_session = self.context.update.call_args_list[0][1].get("transfer_session") + transfer_session = self.context.update.call_args_list[0][1].get( + "transfer_session" + ) self.assertEqual(id, transfer_session.id) self.assertEqual(records_total, transfer_session.records_total) self.assertEqual(client_fsic, transfer_session.client_fsic) @@ -241,7 +238,9 @@ def test_local_initialize_operation__resume(self): self.context.transfer_session = None operation = InitializeOperation() self.assertEqual(transfer_statuses.COMPLETED, operation.handle(self.context)) - self.context.update.assert_called_once_with(transfer_session=self.transfer_session) + self.context.update.assert_called_once_with( + transfer_session=self.transfer_session + ) def test_local_queue_operation(self): fsics = {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1} @@ -253,9 +252,9 @@ def test_local_queue_operation(self): self.assertNotEqual(0, self.transfer_session.records_total) # ensure all store and buffer records are buffered - self.assertRecordsBuffered(self.data["group1_c1"]) - self.assertRecordsBuffered(self.data["group1_c2"]) - self.assertRecordsBuffered(self.data["group2_c1"]) + assertRecordsBuffered(self.data["group1_c1"]) + assertRecordsBuffered(self.data["group1_c2"]) + assertRecordsBuffered(self.data["group2_c1"]) @mock.patch("morango.sync.operations._queue_into_buffer") def test_local_queue_operation__noop(self, mock_queue): @@ -271,6 +270,160 @@ def test_local_queue_operation__noop(self, mock_queue): mock_queue.assert_not_called() +@override_settings( + MORANGO_SERIALIZE_BEFORE_QUEUING=False, MORANGO_DISABLE_FSIC_V2_FORMAT=False +) +class FSICPartitionEdgeCaseQueuingTestCase(TestCase): + def setUp(self): + # instance IDs + self.i1 = "a" * 32 + self.i2 = "b" * 32 + self.i3 = "c" * 32 + self.i4 = "d" * 32 + + def create_stores(self, store_tuples): + records = [] + for instance_id, partition, counter in store_tuples: + record = Store.objects.create( + id=uuid.uuid4().hex, + last_saved_instance=instance_id, + last_saved_counter=counter, + partition=partition, + profile="facilitydata", + source_id="qqq", + model_name="qqq", + ) + RecordMaxCounter.objects.create( + store_model=record, + instance_id=instance_id, + counter=counter, + ) + records.append(record) + return records + + def create_dmcs(self, dmc_tuples): + self.clear_dmcs() + for instance_id, partition, counter in dmc_tuples: + DatabaseMaxCounter.objects.create( + instance_id=instance_id, partition=partition, counter=counter + ) + + def clear_dmcs(self): + DatabaseMaxCounter.objects.all().delete() + + def fsic_from_dmcs(self, filters, dmc_tuples, is_producer=False): + self.create_dmcs(dmc_tuples) + return DatabaseMaxCounter.calculate_filter_specific_instance_counters(filters, is_producer=is_producer) + + def initialize_sessions(self, filters): + # create controllers for store/buffer operations + conn = mock.Mock(spec="morango.sync.syncsession.NetworkSyncConnection") + conn.server_info = dict(capabilities=[FSIC_V2_FORMAT]) + self.profile_controller = MorangoProfileController("facilitydata") + self.transfer_client = TransferClient(conn, "host", SessionController.build()) + self.sync_session = SyncSession.objects.create( + id=uuid.uuid4().hex, + profile="facilitydata", + last_activity_timestamp=timezone.now(), + ) + self.transfer_session = ( + self.sync_session.current_transfer_session + ) = TransferSession.objects.create( + id=uuid.uuid4().hex, + sync_session=self.sync_session, + push=True, + last_activity_timestamp=timezone.now(), + ) + + self.transfer_session.filter = str(filters) + + self.context = mock.Mock( + spec=LocalSessionContext, + transfer_session=self.transfer_session, + sync_session=self.sync_session, + filter=self.transfer_session.get_filter(), + is_push=self.transfer_session.push, + is_server=False, + ) + + def set_sender_fsic_from_dmcs(self, dmc_tuples): + self.transfer_session.client_fsic = json.dumps( + self.fsic_from_dmcs(self.context.filter, dmc_tuples, is_producer=True) + ) + + def set_receiver_fsic_from_dmcs(self, dmc_tuples): + self.transfer_session.server_fsic = json.dumps( + self.fsic_from_dmcs(self.context.filter, dmc_tuples, is_producer=False) + ) + + def queue(self): + _queue_into_buffer(self.transfer_session) + + def test_soud_to_full_to_full(self): + """ + This tests the scenario discovered through S2S, of a SoUD tablet syncing data to a school server, + which then syncs to KDP, but missed out on the data from the SoUD tablet. + """ + # we'll be doing a full sync + self.initialize_sessions(filters="p") + + tablet_tuples = [ + (self.i1, "p1", 6), + ] + laptop_tuples = [ + (self.i3, "p", 5), + ] + kdp_tuples = [ + (self.i3, "p", 3), + (self.i4, "p", 7), + ] + + tablet_data = self.create_stores(tablet_tuples) + laptop_data = self.create_stores(laptop_tuples) + + self.set_sender_fsic_from_dmcs(tablet_tuples + laptop_tuples) + self.set_receiver_fsic_from_dmcs(kdp_tuples) + self.queue() + assertRecordsBuffered(laptop_data) + assertRecordsBuffered(tablet_data) + + def test_soud_to_full_to_soud(self): + """ + This tests the scenario discovered even earlier through S2S, of a SoUD tablet syncing data to a school server, + and then syncing back to a new SoUD tablet, which missed out on the data from the original SoUD tablet. + """ + # we'll be doing a subset sync + self.initialize_sessions(filters="\n".join(["p1", "p2"])) + + tablet1_tuples = [ + (self.i1, "p1", 6), + (self.i1, "p2", 1), + ] + laptop_tuples_data_included = [ + (self.i3, "p1", 5), + (self.i3, "p2", 2), + ] + laptop_tuples_data_excluded = [ + (self.i3, "p", 5), + ] + laptop_tuples_dmcs = [ + (self.i3, "p", 5), + ] + tablet2_tuples = [] + + tablet_data = self.create_stores(tablet1_tuples) + laptop_data_included = self.create_stores(laptop_tuples_data_included) + laptop_data_excluded = self.create_stores(laptop_tuples_data_excluded) + + self.set_sender_fsic_from_dmcs(tablet1_tuples + laptop_tuples_dmcs) + self.set_receiver_fsic_from_dmcs(tablet2_tuples) + self.queue() + assertRecordsBuffered(laptop_data_included) + assertRecordsNotBuffered(laptop_data_excluded) + assertRecordsBuffered(tablet_data) + + + @override_settings(MORANGO_DESERIALIZE_AFTER_DEQUEUING=False) class DequeueBufferIntoStoreTestCase(TestCase): def setUp(self): @@ -315,18 +468,27 @@ def assert_store_records_not_tagged_with_last_session(self, store_ids): session_id = self.data["sc"].current_transfer_session.id for store_id in store_ids: try: - assert Store.objects.get(id=store_id).last_transfer_session_id != session_id + assert ( + Store.objects.get(id=store_id).last_transfer_session_id + != session_id + ) except Store.DoesNotExist: pass def test_dequeuing_sets_last_session(self): - store_ids = [self.data[key] for key in ["model2", "model3", "model4", "model5", "model7"]] + store_ids = [ + self.data[key] for key in ["model2", "model3", "model4", "model5", "model7"] + ] self.assert_store_records_not_tagged_with_last_session(store_ids) _dequeue_into_store(self.data["sc"].current_transfer_session) # this one is a reverse fast forward, so it doesn't modify the store record and shouldn't be tagged self.assert_store_records_not_tagged_with_last_session([self.data["model1"]]) self.assert_store_records_tagged_with_last_session(store_ids) - tagged_actual = set(self.data["sc"].current_transfer_session.get_touched_record_ids_for_model("facility")) + tagged_actual = set( + self.data["sc"].current_transfer_session.get_touched_record_ids_for_model( + "facility" + ) + ) tagged_expected = set(store_ids) assert tagged_actual == tagged_expected @@ -679,7 +841,9 @@ def test_local_cleanup(self): Buffer.objects.filter(transfer_session_id=self.transfer_session.id).exists() ) self.assertTrue( - RecordMaxCounterBuffer.objects.filter(transfer_session_id=self.transfer_session.id).exists() + RecordMaxCounterBuffer.objects.filter( + transfer_session_id=self.transfer_session.id + ).exists() ) self.assertEqual(transfer_statuses.COMPLETED, operation.handle(self.context)) self.assertFalse(self.transfer_session.active) @@ -687,5 +851,7 @@ def test_local_cleanup(self): Buffer.objects.filter(transfer_session_id=self.transfer_session.id).exists() ) self.assertFalse( - RecordMaxCounterBuffer.objects.filter(transfer_session_id=self.transfer_session.id).exists() + RecordMaxCounterBuffer.objects.filter( + transfer_session_id=self.transfer_session.id + ).exists() ) diff --git a/tests/testapp/tests/test_utils.py b/tests/testapp/tests/test_utils.py index 0d48225b..39449bd3 100644 --- a/tests/testapp/tests/test_utils.py +++ b/tests/testapp/tests/test_utils.py @@ -7,6 +7,7 @@ from morango.constants.capabilities import ALLOW_CERTIFICATE_PUSHING from morango.constants.capabilities import ASYNC_OPERATIONS +from morango.constants.capabilities import FSIC_V2_FORMAT from morango.constants import transfer_stages from morango.utils import SETTINGS from morango.utils import CAPABILITIES_CLIENT_HEADER @@ -29,6 +30,7 @@ def test_defaults(self): self.assertEqual(SETTINGS.MORANGO_SERIALIZE_BEFORE_QUEUING, True) self.assertEqual(SETTINGS.MORANGO_DESERIALIZE_AFTER_DEQUEUING, True) self.assertEqual(SETTINGS.MORANGO_DISALLOW_ASYNC_OPERATIONS, False) + self.assertEqual(SETTINGS.MORANGO_DISABLE_FSIC_V2_FORMAT, False) self.assertLength(3, SETTINGS.MORANGO_INITIALIZE_OPERATIONS) self.assertLength(3, SETTINGS.MORANGO_SERIALIZE_OPERATIONS) self.assertLength(4, SETTINGS.MORANGO_QUEUE_OPERATIONS) @@ -59,6 +61,13 @@ def test_get_capabilities__async_ops(self): with self.settings(MORANGO_DISALLOW_ASYNC_OPERATIONS=True): self.assertNotIn(ASYNC_OPERATIONS, get_capabilities()) + def test_get_capabilities__fsic_v2_format(self): + with self.settings(MORANGO_DISABLE_FSIC_V2_FORMAT=False): + self.assertIn(FSIC_V2_FORMAT, get_capabilities()) + + with self.settings(MORANGO_DISABLE_FSIC_V2_FORMAT=True): + self.assertNotIn(FSIC_V2_FORMAT, get_capabilities()) + @mock.patch("morango.utils.CAPABILITIES", ("TEST", "SERIALIZE")) def test_serialize(self): req = Request() From 7c86684d67adf4406d62b7b60ae75387d84188a6 Mon Sep 17 00:00:00 2001 From: Jamie Alexandre Date: Mon, 24 Jan 2022 23:04:36 -0800 Subject: [PATCH 2/8] Further updates to FSIC v2 work, tests almost all pass --- morango/errors.py | 4 + morango/models/certificates.py | 2 +- morango/models/core.py | 14 +- morango/sync/context.py | 2 +- morango/sync/operations.py | 49 +++--- tests/testapp/tests/sync/test_operations.py | 177 +++++++++++++++++--- 6 files changed, 197 insertions(+), 51 deletions(-) diff --git a/morango/errors.py b/morango/errors.py index 5885bd2e..4af17aa0 100644 --- a/morango/errors.py +++ b/morango/errors.py @@ -68,3 +68,7 @@ class MorangoLimitExceeded(MorangoError): class InvalidMorangoSourceId(MorangoError): pass + + +class MorangoInvalidFSICPartition(MorangoError): + pass \ No newline at end of file diff --git a/morango/models/certificates.py b/morango/models/certificates.py index c915db69..72201436 100644 --- a/morango/models/certificates.py +++ b/morango/models/certificates.py @@ -338,7 +338,7 @@ def __init__(self, template, params={}): self._template = template self._params = params self._filter_string = string.Template(template).safe_substitute(params) - self._filter_tuple = tuple(self._filter_string.split()) + self._filter_tuple = tuple(self._filter_string.split()) or ("",) def is_subset_of(self, other): for partition in self._filter_tuple: diff --git a/morango/models/core.py b/morango/models/core.py index bcf17188..faa9e2d2 100644 --- a/morango/models/core.py +++ b/morango/models/core.py @@ -582,7 +582,7 @@ class Meta: @classmethod @transaction.atomic - def update_fsics(cls, fsics, sync_filter, v2_format=True): + def update_fsics(cls, fsics, sync_filter, v2_format=False): internal_fsic = DatabaseMaxCounter.calculate_filter_specific_instance_counters( sync_filter, v2_format=v2_format ) @@ -614,7 +614,7 @@ def update_fsics(cls, fsics, sync_filter, v2_format=True): ) @classmethod - def get_instance_counters_for_partition(cls, part, only_if_in_store=False): + def get_instance_counters_for_partition(cls, part, only_if_in_store=True): """ Return a dict of {instance_id: counter} for DMC's with the given partition. If only_if_in_store is True, only return DMCs whose instance_id is still in the Store. @@ -634,9 +634,8 @@ def calculate_filter_specific_instance_counters( cls, filters, is_producer=False, - v2_format=True, - only_if_in_store=False, - remove_redundant=True, + v2_format=False, + only_if_in_store=True, ): if v2_format: @@ -685,9 +684,8 @@ def calculate_filter_specific_instance_counters( "sub": sub_fsics, } - # if requested, remove instance counters on partitions that are redudant to counters on super partitions - if remove_redundant: - remove_redundant_instance_counters(raw_fsic) + # remove instance counters on partitions that are redundant to counters on super partitions + remove_redundant_instance_counters(raw_fsic) return raw_fsic diff --git a/morango/sync/context.py b/morango/sync/context.py index 5c30afd8..9a60f40e 100644 --- a/morango/sync/context.py +++ b/morango/sync/context.py @@ -39,7 +39,7 @@ def __init__( :type sync_filter Filter|None :param is_push: A boolean indicating whether or not the transfer is a push or pull :type is_push: bool - :param capabilities: Capabilities set that is combined (union) against our own capabilities + :param capabilities: Capabilities set that is combined (intersected) with our own capabilities :type capabilities: set|None """ self.sync_session = sync_session diff --git a/morango/sync/operations.py b/morango/sync/operations.py index 829d7866..34afae7c 100644 --- a/morango/sync/operations.py +++ b/morango/sync/operations.py @@ -18,10 +18,12 @@ from morango.api.serializers import BufferSerializer from morango.constants.capabilities import ASYNC_OPERATIONS +from morango.constants.capabilities import FSIC_V2_FORMAT from morango.constants import transfer_stages from morango.constants import transfer_statuses from morango.errors import MorangoResumeSyncError from morango.errors import MorangoLimitExceeded +from morango.errors import MorangoInvalidFSICPartition from morango.models.certificates import Filter from morango.models.core import Buffer from morango.models.core import DatabaseMaxCounter @@ -398,9 +400,10 @@ def _deserialize_from_store(profile, skip_erroring=False, filter=None): @transaction.atomic(using=USING_DB) -def _queue_into_buffer(transfersession): +def _queue_into_buffer_v1(transfersession): """ - Takes a chunk of data from the store to be put into the buffer to be sent to another morango instance. + Takes a chunk of data from the store to be put into the buffer to be sent to another morango instance. This is the legacy + code to handle backwards compatibility with older versions of Morango, with the v1 version of the FSIC data structure. ALGORITHM: We do Filter Specific Instance Counter arithmetic to get our newest data compared to the server's older data. We use raw sql queries to place data in the buffer and the record max counter buffer, which matches the conditions of the FSIC, @@ -410,9 +413,6 @@ def _queue_into_buffer(transfersession): server_fsic = json.loads(transfersession.server_fsic) client_fsic = json.loads(transfersession.client_fsic) - if "sub" in server_fsic: - return _queue_into_buffer_v2(transfersession) - if transfersession.push: fsics = calculate_directional_fsic_diff(client_fsic, server_fsic) else: @@ -529,13 +529,13 @@ def _queue_into_buffer_v2(transfersession): """ Takes a chunk of data from the store to be put into the buffer to be sent to another morango instance. + This version uses the new v2 FSIC format that is split out by partition, divided into sub partitions (the ones under the filter) + and super partitions (prefixes of the sub partitions). + ALGORITHM: We do Filter Specific Instance Counter arithmetic to get our newest data compared to the server's older data. We use raw sql queries to place data in the buffer and the record max counter buffer, which matches the conditions of the FSIC. - - This version uses the new FSIC format that is split out by partition, divided into sub partitions (the ones under the filter) - and super partitions (prefixes of the sub partitions). """ - filter_prefixes = Filter(transfersession.filter) + sync_filter = Filter(transfersession.filter) server_fsic = json.loads(transfersession.server_fsic) client_fsic = json.loads(transfersession.client_fsic) @@ -544,6 +544,11 @@ def _queue_into_buffer_v2(transfersession): assert "sub" in client_fsic assert "super" in client_fsic + # ensure that the partitions in the FSICs are under the current filter, before using them + for partition in itertools.chain(server_fsic["sub"].keys(), client_fsic["sub"]): + if partition not in sync_filter: + raise MorangoInvalidFSICPartition("Partition '{}' is not in filter".format(partition)) + server_fsic = expand_fsic_for_use(server_fsic) client_fsic = expand_fsic_for_use(client_fsic) @@ -582,11 +587,8 @@ def _queue_into_buffer_v2(transfersession): profile_condition + partition_conditions, "AND" ) - select_buffers = [] - select_rmc_buffers = [] - # execute raw sql to take all records that match condition, to be put into buffer for transfer - select_buffers.append( + select_buffer_query = ( """SELECT DISTINCT id, serialized, deleted, last_saved_instance, last_saved_counter, hard_deleted, model_name, profile, partition, source_id, conflicting_serialized_data, @@ -600,7 +602,7 @@ def _queue_into_buffer_v2(transfersession): ) ) # take all record max counters that are foreign keyed onto store models, which were queued into the buffer - select_rmc_buffers.append( + select_rmc_buffer_query = ( """SELECT instance_id, counter, CAST ('{transfer_session_id}' AS {transfer_session_id_type}), store_model_id FROM {record_max_counter} AS rmc INNER JOIN {outgoing_buffer} AS buffer ON rmc.store_model_id = buffer.model_uuid @@ -622,7 +624,7 @@ def _queue_into_buffer_v2(transfersession): {select} """.format( outgoing_buffer=Buffer._meta.db_table, - select=" UNION ".join(select_buffers), + select=select_buffer_query, ) ) cursor.execute( @@ -631,7 +633,7 @@ def _queue_into_buffer_v2(transfersession): {select} """.format( outgoing_rmcb=RecordMaxCounterBuffer._meta.db_table, - select=" UNION ".join(select_rmc_buffers), + select=select_rmc_buffer_query, ) ) @@ -799,7 +801,9 @@ def handle(self, context): fsic = json.dumps( DatabaseMaxCounter.calculate_filter_specific_instance_counters( - context.filter, is_producer=context.is_producer + context.filter, + is_producer=context.is_producer, + v2_format=FSIC_V2_FORMAT in context.capabilities, ) ) if context.is_server: @@ -826,7 +830,10 @@ def handle(self, context): self._assert(context.sync_session is not None) self._assert(context.transfer_session is not None) - _queue_into_buffer(context.transfer_session) + if FSIC_V2_FORMAT in context.capabilities: + _queue_into_buffer_v2(context.transfer_session) + else: + _queue_into_buffer_v1(context.transfer_session) # update the records_total for client and server transfer session records_total = Buffer.objects.filter( @@ -978,7 +985,11 @@ def handle(self, context): if context.is_server else context.transfer_session.server_fsic ) - DatabaseMaxCounter.update_fsics(json.loads(fsic), context.filter) + DatabaseMaxCounter.update_fsics( + json.loads(fsic), + context.filter, + v2_format=FSIC_V2_FORMAT in context.capabilities, + ) return transfer_statuses.COMPLETED diff --git a/tests/testapp/tests/sync/test_operations.py b/tests/testapp/tests/sync/test_operations.py index fbdab743..9f5be780 100644 --- a/tests/testapp/tests/sync/test_operations.py +++ b/tests/testapp/tests/sync/test_operations.py @@ -29,7 +29,8 @@ from morango.sync.controller import MorangoProfileController from morango.sync.controller import SessionController from morango.sync.operations import _dequeue_into_store -from morango.sync.operations import _queue_into_buffer +from morango.sync.operations import _queue_into_buffer_v1 +from morango.sync.operations import _queue_into_buffer_v2 from morango.sync.operations import CleanupOperation from morango.sync.operations import ReceiverDequeueOperation from morango.sync.operations import ProducerDequeueOperation @@ -67,13 +68,12 @@ def assertRecordsNotBuffered(records): assert i.id not in rmcb_ids -@override_settings(MORANGO_SERIALIZE_BEFORE_QUEUING=False) -class QueueStoreIntoBufferTestCase(TestCase): +@override_settings(MORANGO_SERIALIZE_BEFORE_QUEUING=False, MORANGO_DISABLE_FSIC_V2_FORMAT=True) +class QueueStoreIntoBufferV1TestCase(TestCase): def setUp(self): - super(QueueStoreIntoBufferTestCase, self).setUp() + super(QueueStoreIntoBufferV1TestCase, self).setUp() self.data = create_dummy_store_data() self.transfer_session = self.data["sc"].current_transfer_session - # self.transfer_session.filter = "abc123" self.context = mock.Mock( spec=LocalSessionContext, transfer_session=self.transfer_session, @@ -81,12 +81,13 @@ def setUp(self): filter=self.transfer_session.get_filter(), is_push=self.transfer_session.push, is_server=False, + capabilities=[], ) def test_all_fsics(self): fsics = {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1} self.transfer_session.client_fsic = json.dumps(fsics) - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v1(self.transfer_session) # ensure all store and buffer records are buffered assertRecordsBuffered(self.data["group1_c1"]) assertRecordsBuffered(self.data["group1_c2"]) @@ -99,7 +100,7 @@ def test_very_many_fsics(self): fsics = {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1} fsics.update({uuid.uuid4().hex: i for i in range(20000)}) self.transfer_session.client_fsic = json.dumps(fsics) - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v1(self.transfer_session) # ensure all store and buffer records are buffered assertRecordsBuffered(self.data["group1_c1"]) assertRecordsBuffered(self.data["group1_c2"]) @@ -114,7 +115,7 @@ def test_very_very_many_fsics(self): fsics = {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1} fsics.update({uuid.uuid4().hex: i for i in range(99999)}) self.transfer_session.client_fsic = json.dumps(fsics) - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v1(self.transfer_session) # ensure all store and buffer records are buffered assertRecordsBuffered(self.data["group1_c1"]) assertRecordsBuffered(self.data["group1_c2"]) @@ -125,12 +126,12 @@ def test_too_many_fsics(self): fsics.update({uuid.uuid4().hex: i for i in range(100000)}) self.transfer_session.client_fsic = json.dumps(fsics) with self.assertRaises(MorangoLimitExceeded): - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v1(self.transfer_session) def test_fsic_specific_id(self): fsics = {self.data["group2_id"].id: 1} self.transfer_session.client_fsic = json.dumps(fsics) - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v1(self.transfer_session) # ensure only records modified with 2nd instance id are buffered assertRecordsNotBuffered(self.data["group1_c1"]) assertRecordsNotBuffered(self.data["group1_c2"]) @@ -142,7 +143,7 @@ def test_fsic_counters(self): self.transfer_session.client_fsic = json.dumps(fsics) fsics[self.data["group1_id"].id] = 0 self.transfer_session.server_fsic = json.dumps(fsics) - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v1(self.transfer_session) # ensure only records with updated 1st instance id are buffered assertRecordsBuffered(self.data["group1_c1"]) assertRecordsBuffered(self.data["group1_c2"]) @@ -152,7 +153,7 @@ def test_fsic_counters_too_high(self): fsics = {self.data["group1_id"].id: 100, self.data["group2_id"].id: 100} self.transfer_session.client_fsic = json.dumps(fsics) self.transfer_session.server_fsic = json.dumps(fsics) - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v1(self.transfer_session) # ensure no records are buffered self.assertFalse(Buffer.objects.all()) self.assertFalse(RecordMaxCounterBuffer.objects.all()) @@ -164,7 +165,7 @@ def test_partition_filter_buffering(self): ) self.transfer_session.filter = filter_prefixes self.transfer_session.client_fsic = json.dumps(fsics) - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v1(self.transfer_session) # ensure records with different partition values are buffered assertRecordsNotBuffered([self.data["user2"]]) assertRecordsBuffered(self.data["user3_sumlogs"]) @@ -175,7 +176,7 @@ def test_partition_prefix_buffering(self): filter_prefixes = "{}".format(self.data["user2"].id) self.transfer_session.filter = filter_prefixes self.transfer_session.client_fsic = json.dumps(fsics) - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v1(self.transfer_session) # ensure only records with user2 partition are buffered assertRecordsBuffered([self.data["user2"]]) assertRecordsBuffered(self.data["user2_sumlogs"]) @@ -187,7 +188,7 @@ def test_partition_and_fsic_buffering(self): fsics = {self.data["group1_id"].id: 1} self.transfer_session.filter = filter_prefixes self.transfer_session.client_fsic = json.dumps(fsics) - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v1(self.transfer_session) # ensure records updated with 1st instance id and summarylog partition are buffered assertRecordsBuffered(self.data["user1_sumlogs"]) assertRecordsNotBuffered(self.data["user2_sumlogs"]) @@ -198,7 +199,7 @@ def test_valid_fsic_but_invalid_partition(self): fsics = {self.data["group2_id"].id: 1} self.transfer_session.filter = filter_prefixes self.transfer_session.client_fsic = json.dumps(fsics) - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v1(self.transfer_session) # ensure that record with valid fsic but invalid partition is not buffered assertRecordsNotBuffered([self.data["user4"]]) @@ -256,7 +257,7 @@ def test_local_queue_operation(self): assertRecordsBuffered(self.data["group1_c2"]) assertRecordsBuffered(self.data["group2_c1"]) - @mock.patch("morango.sync.operations._queue_into_buffer") + @mock.patch("morango.sync.operations._queue_into_buffer_v1") def test_local_queue_operation__noop(self, mock_queue): fsics = {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1} self.transfer_session.client_fsic = json.dumps(fsics) @@ -270,6 +271,136 @@ def test_local_queue_operation__noop(self, mock_queue): mock_queue.assert_not_called() +@override_settings(MORANGO_SERIALIZE_BEFORE_QUEUING=False, MORANGO_DISABLE_FSIC_V2_FORMAT=False) +class QueueStoreIntoBufferV2TestCase(TestCase): + def setUp(self): + super(QueueStoreIntoBufferV2TestCase, self).setUp() + self.data = create_dummy_store_data() + self.transfer_session = self.data["sc"].current_transfer_session + self.context = mock.Mock( + spec=LocalSessionContext, + transfer_session=self.transfer_session, + sync_session=self.transfer_session.sync_session, + filter=self.transfer_session.get_filter(), + is_push=self.transfer_session.push, + is_server=False, + capabilities=[FSIC_V2_FORMAT], + ) + + def test_many_fsics(self): + """ + Testing the limits of how many instances can be included before we hit 'Expression tree is too large' + """ + fsics = {"super": {}, "sub": {"": {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1}}} + fsics["sub"][""].update({uuid.uuid4().hex: i for i in range(995)}) + self.transfer_session.client_fsic = json.dumps(fsics) + self.transfer_session.server_fsic = json.dumps({"super": {}, "sub": {}}) + _queue_into_buffer_v2(self.transfer_session) + # ensure all store and buffer records are buffered + assertRecordsBuffered(self.data["group1_c1"]) + assertRecordsBuffered(self.data["group1_c2"]) + assertRecordsBuffered(self.data["group2_c1"]) + + # def test_very_many_fsics(self): + # """ + # Regression test against 'Expression tree is too large (maximum depth 1000)' error with many fsics + # """ + # fsics = {"super": {}, "sub": {"": {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1}}} + # fsics["sub"][""].update({uuid.uuid4().hex: i for i in range(20000)}) + # self.transfer_session.client_fsic = json.dumps(fsics) + # self.transfer_session.server_fsic = json.dumps({"super": {}, "sub": {}}) + # _queue_into_buffer_v2(self.transfer_session) + # # ensure all store and buffer records are buffered + # assertRecordsBuffered(self.data["group1_c1"]) + # assertRecordsBuffered(self.data["group1_c2"]) + # assertRecordsBuffered(self.data["group2_c1"]) + + # @pytest.mark.skip("Takes 30+ seconds, manual run only") + # def test_very_very_many_fsics(self): + # """ + # Regression test against 'Expression tree is too large (maximum depth 1000)' error with many fsics + # Maximum supported value: 99,999 + # """ + # fsics = {"super": {}, "sub": {"": {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1}}} + # fsics["sub"][""].update({uuid.uuid4().hex: i for i in range(99999)}) + # self.transfer_session.client_fsic = json.dumps(fsics) + # self.transfer_session.server_fsic = json.dumps({"super": {}, "sub": {}}) + # _queue_into_buffer_v2(self.transfer_session) + # # ensure all store and buffer records are buffered + # assertRecordsBuffered(self.data["group1_c1"]) + # assertRecordsBuffered(self.data["group1_c2"]) + # assertRecordsBuffered(self.data["group2_c1"]) + + def test_fsic_specific_id(self): + fsics = {"super": {}, "sub": {"": {self.data["group2_id"].id: 1}}} + self.transfer_session.client_fsic = json.dumps(fsics) + self.transfer_session.server_fsic = json.dumps({"super": {}, "sub": {}}) + _queue_into_buffer_v2(self.transfer_session) + # ensure only records modified with 2nd instance id are buffered + assertRecordsNotBuffered(self.data["group1_c1"]) + assertRecordsNotBuffered(self.data["group1_c2"]) + assertRecordsBuffered(self.data["group2_c1"]) + + def test_fsic_counters(self): + counter = InstanceIDModel.objects.get(id=self.data["group1_id"].id).counter + fsics = {"super": {}, "sub": {"": {self.data["group1_id"].id: counter - 1}}} + self.transfer_session.client_fsic = json.dumps(fsics) + fsics["sub"][""][self.data["group1_id"].id] = 0 + self.transfer_session.server_fsic = json.dumps(fsics) + _queue_into_buffer_v2(self.transfer_session) + # ensure only records with updated 1st instance id are buffered + assertRecordsBuffered(self.data["group1_c1"]) + assertRecordsBuffered(self.data["group1_c2"]) + assertRecordsNotBuffered(self.data["group2_c1"]) + + def test_fsic_counters_too_high(self): + fsics = {"super": {}, "sub": {"": {self.data["group1_id"].id: 100, self.data["group2_id"].id: 100}}} + self.transfer_session.client_fsic = json.dumps(fsics) + self.transfer_session.server_fsic = json.dumps(fsics) + _queue_into_buffer_v2(self.transfer_session) + # ensure no records are buffered + self.assertFalse(Buffer.objects.all()) + self.assertFalse(RecordMaxCounterBuffer.objects.all()) + + def test_valid_fsic_but_invalid_partition(self): + filter_prefixes = "{}:user:summary".format(self.data["user1"].id) + fsics = {"super": {}, "sub": {filter_prefixes: {self.data["group2_id"].id: 1}}} + self.transfer_session.filter = filter_prefixes + self.transfer_session.client_fsic = json.dumps(fsics) + self.transfer_session.server_fsic = json.dumps({"super": {}, "sub": {}}) + _queue_into_buffer_v2(self.transfer_session) + # ensure that record with valid fsic but invalid partition is not buffered + assertRecordsNotBuffered([self.data["user4"]]) + + def test_local_queue_operation(self): + fsics = {"super": {}, "sub": {"": {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1}}} + self.transfer_session.client_fsic = json.dumps(fsics) + self.transfer_session.server_fsic = json.dumps({"super": {}, "sub": {}}) + self.assertEqual(0, self.transfer_session.records_total or 0) + operation = ProducerQueueOperation() + self.assertEqual(transfer_statuses.COMPLETED, operation.handle(self.context)) + self.assertNotEqual(0, self.transfer_session.records_total) + + # ensure all store and buffer records are buffered + assertRecordsBuffered(self.data["group1_c1"]) + assertRecordsBuffered(self.data["group1_c2"]) + assertRecordsBuffered(self.data["group2_c1"]) + + @mock.patch("morango.sync.operations._queue_into_buffer_v2") + def test_local_queue_operation__noop(self, mock_queue): + fsics = {"super": {}, "sub": {"": {self.data["group1_id"].id: 1, self.data["group2_id"].id: 1}}} + self.transfer_session.client_fsic = json.dumps(fsics) + self.transfer_session.server_fsic = json.dumps({"super": {}, "sub": {}}) + + # as server, for push, operation should not queue into buffer + self.context.is_push = True + self.context.is_server = True + + operation = ReceiverQueueOperation() + self.assertEqual(transfer_statuses.COMPLETED, operation.handle(self.context)) + mock_queue.assert_not_called() + + @override_settings( MORANGO_SERIALIZE_BEFORE_QUEUING=False, MORANGO_DISABLE_FSIC_V2_FORMAT=False ) @@ -311,9 +442,9 @@ def create_dmcs(self, dmc_tuples): def clear_dmcs(self): DatabaseMaxCounter.objects.all().delete() - def fsic_from_dmcs(self, filters, dmc_tuples, is_producer=False): + def fsic_from_dmcs(self, filters, dmc_tuples): self.create_dmcs(dmc_tuples) - return DatabaseMaxCounter.calculate_filter_specific_instance_counters(filters, is_producer=is_producer) + return DatabaseMaxCounter.calculate_filter_specific_instance_counters(filters, v2_format=True) def initialize_sessions(self, filters): # create controllers for store/buffer operations @@ -344,20 +475,21 @@ def initialize_sessions(self, filters): filter=self.transfer_session.get_filter(), is_push=self.transfer_session.push, is_server=False, + capabilities=[FSIC_V2_FORMAT], ) def set_sender_fsic_from_dmcs(self, dmc_tuples): self.transfer_session.client_fsic = json.dumps( - self.fsic_from_dmcs(self.context.filter, dmc_tuples, is_producer=True) + self.fsic_from_dmcs(self.context.filter, dmc_tuples) ) def set_receiver_fsic_from_dmcs(self, dmc_tuples): self.transfer_session.server_fsic = json.dumps( - self.fsic_from_dmcs(self.context.filter, dmc_tuples, is_producer=False) + self.fsic_from_dmcs(self.context.filter, dmc_tuples) ) def queue(self): - _queue_into_buffer(self.transfer_session) + _queue_into_buffer_v2(self.transfer_session) def test_soud_to_full_to_full(self): """ @@ -457,6 +589,7 @@ def setUp(self): transfer_session=self.transfer_session, sync_session=self.transfer_session.sync_session, is_server=True, + capabilities=[], ) def assert_store_records_tagged_with_last_session(self, store_ids): From ccba3ad15417fe27355fb9297b77ae47ef25b786 Mon Sep 17 00:00:00 2001 From: Jamie Alexandre Date: Wed, 26 Jan 2022 22:01:07 -0800 Subject: [PATCH 3/8] Linting and additional test for FSIC v2 --- morango/constants/capabilities.py | 2 +- morango/errors.py | 2 +- morango/models/core.py | 1 - morango/models/fsic_utils.py | 14 ++++++---- morango/sync/operations.py | 1 - tests/testapp/tests/sync/test_operations.py | 29 +++++++++++++++++++++ 6 files changed, 40 insertions(+), 9 deletions(-) diff --git a/morango/constants/capabilities.py b/morango/constants/capabilities.py index 031e50b9..840626b4 100644 --- a/morango/constants/capabilities.py +++ b/morango/constants/capabilities.py @@ -1,4 +1,4 @@ GZIP_BUFFER_POST = "GZIP_BUFFER_POST" ALLOW_CERTIFICATE_PUSHING = "ALLOW_CERTIFICATE_PUSHING" ASYNC_OPERATIONS = "ASYNC_OPERATIONS" -FSIC_V2_FORMAT = "FSIC_V2_FORMAT" \ No newline at end of file +FSIC_V2_FORMAT = "FSIC_V2_FORMAT" diff --git a/morango/errors.py b/morango/errors.py index 4af17aa0..fed9b403 100644 --- a/morango/errors.py +++ b/morango/errors.py @@ -71,4 +71,4 @@ class InvalidMorangoSourceId(MorangoError): class MorangoInvalidFSICPartition(MorangoError): - pass \ No newline at end of file + pass diff --git a/morango/models/core.py b/morango/models/core.py index faa9e2d2..32abbf0a 100644 --- a/morango/models/core.py +++ b/morango/models/core.py @@ -1,5 +1,4 @@ from __future__ import unicode_literals -from collections import defaultdict import functools import json diff --git a/morango/models/fsic_utils.py b/morango/models/fsic_utils.py index 4cf578a3..e1436bca 100644 --- a/morango/models/fsic_utils.py +++ b/morango/models/fsic_utils.py @@ -85,7 +85,7 @@ def calculate_directional_fsic_diff(fsic1, fsic2): """ Calculate the (instance_id, counter) pairs that are the lower-bound levels for sending data from the device with fsic1 to the device with fsic2. - + :param fsic1: dict containing (instance_id, counter) pairs for the sending device :param fsic2: dict containing (instance_id, counter) pairs for the receiving device :return ``dict`` of fsics to be used in queueing the correct records to the buffer @@ -103,12 +103,14 @@ def calculate_directional_fsic_diff_v2(fsic1, fsic2): device with fsic1 to the device with fsic2. FSIC v2 expanded format: {partition: {instance_id: counter}} - + :param fsic1: dict containing FSIC v2 in expanded format, for the sending device :param fsic2: dict containing FSIC v2 in expanded format, for the receiving device :return ``dict`` in expanded FSIC v2 format to be used in queueing the correct records to the buffer """ - prefixes = _build_prefix_mapper(list(fsic1.keys()) + list(fsic2.keys()), include_self=True) + prefixes = _build_prefix_mapper( + list(fsic1.keys()) + list(fsic2.keys()), include_self=True + ) result = defaultdict(dict) @@ -117,8 +119,10 @@ def calculate_directional_fsic_diff_v2(fsic1, fsic2): # check for counters in the sending FSIC that are higher than the receiving FSIC for inst, sending_counter in insts.items(): # get the maximum counter in the receiving FSIC for the same instance - receiving_counter = max(fsic2.get(prefix, {}).get(inst, 0) for prefix in prefixes[part]) + receiving_counter = max( + fsic2.get(prefix, {}).get(inst, 0) for prefix in prefixes[part] + ) if receiving_counter < sending_counter: result[part][inst] = receiving_counter - return dict(result) \ No newline at end of file + return dict(result) diff --git a/morango/sync/operations.py b/morango/sync/operations.py index 34afae7c..df954afc 100644 --- a/morango/sync/operations.py +++ b/morango/sync/operations.py @@ -1,4 +1,3 @@ -from curses import raw import functools import itertools import json diff --git a/tests/testapp/tests/sync/test_operations.py b/tests/testapp/tests/sync/test_operations.py index 9f5be780..eef9ebde 100644 --- a/tests/testapp/tests/sync/test_operations.py +++ b/tests/testapp/tests/sync/test_operations.py @@ -554,6 +554,35 @@ def test_soud_to_full_to_soud(self): assertRecordsNotBuffered(laptop_data_excluded) assertRecordsBuffered(tablet_data) + def test_subpartition_gets_included(self): + """ + Ensure that store records in subpartitions get included, but only if in the filter. + """ + + self.initialize_sessions(filters="p1") + + store_tuples_included = [ + (self.i1, "p1a", 2), + (self.i1, "p1b", 5), + ] + store_tuples_excluded = [ + (self.i1, "p2", 5), + (self.i2, "p1", 2), + (self.i2, "p", 2), + ] + dmc_tuples = [ + (self.i1, "p1", 5), + (self.i2, "p2", 5), + ] + + data_included = self.create_stores(store_tuples_included) + data_excluded = self.create_stores(store_tuples_excluded) + + self.set_sender_fsic_from_dmcs(dmc_tuples) + self.set_receiver_fsic_from_dmcs([]) + self.queue() + assertRecordsBuffered(data_included) + assertRecordsNotBuffered(data_excluded) @override_settings(MORANGO_DESERIALIZE_AFTER_DEQUEUING=False) From 5f20c20a85745deb66d38fdbcf085f4df7740b32 Mon Sep 17 00:00:00 2001 From: Blaine Jester Date: Thu, 27 Jan 2022 08:28:06 -0800 Subject: [PATCH 4/8] Use custom error type for operations that don't handle a context --- morango/errors.py | 4 ++++ morango/sync/operations.py | 7 ++++--- morango/utils.py | 5 +++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/morango/errors.py b/morango/errors.py index fed9b403..b56e8a38 100644 --- a/morango/errors.py +++ b/morango/errors.py @@ -72,3 +72,7 @@ class InvalidMorangoSourceId(MorangoError): class MorangoInvalidFSICPartition(MorangoError): pass + + +class MorangoSkipOperation(MorangoError): + pass diff --git a/morango/sync/operations.py b/morango/sync/operations.py index df954afc..b2ed57b2 100644 --- a/morango/sync/operations.py +++ b/morango/sync/operations.py @@ -23,6 +23,7 @@ from morango.errors import MorangoResumeSyncError from morango.errors import MorangoLimitExceeded from morango.errors import MorangoInvalidFSICPartition +from morango.errors import MorangoSkipOperation from morango.models.certificates import Filter from morango.models.core import Buffer from morango.models.core import DatabaseMaxCounter @@ -695,8 +696,8 @@ def __call__(self, context): raise NotImplementedError( "Transfer operation must return False, or a transfer status" ) - except AssertionError: - # if the operation raises an AssertionError, we equate that to returning False, which + except MorangoSkipOperation: + # if the operation raises an MorangoSkipOperation, we equate that to returning False, which # means that this operation did not handle it and so other operation instances should # be tried to handle it result = False @@ -714,7 +715,7 @@ def _assert(self, condition, message="Operation does not handle this condition") """ :param condition: a bool condition, if false will raise assertion error """ - _assert(condition, message) + _assert(condition, message, error_type=MorangoSkipOperation) class LocalOperation(BaseOperation): diff --git a/morango/utils.py b/morango/utils.py index 27550dfd..985c3685 100644 --- a/morango/utils.py +++ b/morango/utils.py @@ -122,10 +122,11 @@ def _windows_pid_exists(pid): pid_exists = _windows_pid_exists -def _assert(condition, message): +def _assert(condition, message, error_type=AssertionError): """ :param condition: A bool condition that if false will raise an AssertionError :param message: assertion error detail message + :param error_type: The type of error to raise """ if not condition: - raise AssertionError(message) + raise error_type(message) From 1201485231617f6c1232a4a79ab26af6c78715e6 Mon Sep 17 00:00:00 2001 From: Jamie Alexandre Date: Thu, 27 Jan 2022 23:28:31 -0800 Subject: [PATCH 5/8] Ensure capabilities are marked on the local_context --- morango/sync/syncsession.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/morango/sync/syncsession.py b/morango/sync/syncsession.py index 7c428043..04e65ddd 100644 --- a/morango/sync/syncsession.py +++ b/morango/sync/syncsession.py @@ -594,10 +594,12 @@ def __init__(self, sync_connection, sync_session, controller): self.signals = SyncClientSignals() # TODO: come up with strategy to use only one context here - self.local_context = LocalSessionContext(sync_session=sync_session) self.remote_context = NetworkSessionContext( sync_connection, sync_session=sync_session ) + self.local_context = LocalSessionContext( + sync_session=sync_session, capabilities=self.remote_context.capabilities + ) def proceed_to_and_wait_for(self, stage): contexts = (self.local_context, self.remote_context) From eb6e00d872ba8951c0df4e15ed1ff789d65bf331 Mon Sep 17 00:00:00 2001 From: Jamie Alexandre Date: Fri, 28 Jan 2022 20:17:18 -0800 Subject: [PATCH 6/8] Incorporate @bjester's optimizations to fsic calc --- morango/models/core.py | 51 +++++++++++++++++++++++------------------- 1 file changed, 28 insertions(+), 23 deletions(-) diff --git a/morango/models/core.py b/morango/models/core.py index 32abbf0a..94f40f2a 100644 --- a/morango/models/core.py +++ b/morango/models/core.py @@ -592,7 +592,9 @@ def update_fsics(cls, fsics, sync_filter, v2_format=False): for inst, counter in insts.items(): if internal_fsic.get("part", {}).get(inst, 0) < counter: DatabaseMaxCounter.objects.update_or_create( - instance_id=inst, partition=part, defaults={"counter": counter} + instance_id=inst, + partition=part, + defaults={"counter": counter}, ) else: updated_fsic = {} @@ -613,20 +615,27 @@ def update_fsics(cls, fsics, sync_filter, v2_format=False): ) @classmethod - def get_instance_counters_for_partition(cls, part, only_if_in_store=True): - """ - Return a dict of {instance_id: counter} for DMC's with the given partition. - If only_if_in_store is True, only return DMCs whose instance_id is still in the Store. - """ - queryset = cls.objects.filter(partition=part) + def get_instance_counters_for_partitions(cls, partitions, only_if_in_store=True): + queryset = cls.objects.filter(partition__in=partitions) if only_if_in_store: - matching_store_instances = ( - Store.objects.filter(partition__startswith=part) - .values_list("last_saved_instance") - .distinct() + matching_store_instances = Store.objects.filter( + partition__startswith=models.OuterRef("partition"), + last_saved_instance=models.OuterRef("instance_id"), + ).values("last_saved_instance") + # manually add EXISTS clause to avoid Django bug which puts `= TRUE` comparison that kills postgres perf + exists = models.Exists(matching_store_instances).resolve_expression( + query=queryset.query, allow_joins=True, reuse=None ) - queryset = queryset.filter(instance_id__in=matching_store_instances) - return dict(queryset.values_list("instance_id", "counter")) + queryset.query.where.add(exists, "AND") + + queryset = queryset.values_list("partition", "instance_id", "counter") + + counters = {} + for partition, instance_id, counter in queryset: + if partition not in counters: + counters[partition] = {} + counters[partition].update({instance_id: counter}) + return counters @classmethod def calculate_filter_specific_instance_counters( @@ -667,16 +676,12 @@ def calculate_filter_specific_instance_counters( ) # get the instance counters for the partitions, filtered (if requested) by instance_ids still used in the store - super_fsics = {} - for part in super_partitions: - super_fsics[part] = cls.get_instance_counters_for_partition( - part, only_if_in_store=only_if_in_store - ) - sub_fsics = {} - for part in sub_partitions: - sub_fsics[part] = cls.get_instance_counters_for_partition( - part, only_if_in_store=only_if_in_store - ) + super_fsics = cls.get_instance_counters_for_partitions( + super_partitions, only_if_in_store=only_if_in_store + ) + sub_fsics = cls.get_instance_counters_for_partitions( + sub_partitions, only_if_in_store=only_if_in_store + ) raw_fsic = { "super": super_fsics, From ad3aff92ded811820ca48806e30e2e9b2c8fde97 Mon Sep 17 00:00:00 2001 From: Jamie Alexandre Date: Fri, 28 Jan 2022 22:54:32 -0800 Subject: [PATCH 7/8] Test for instance_id "shadowing" edge case, and address --- morango/models/core.py | 33 ++++++++++++------- .../tests/integration/test_syncsession.py | 33 +++++++++++++++++++ 2 files changed, 54 insertions(+), 12 deletions(-) diff --git a/morango/models/core.py b/morango/models/core.py index 94f40f2a..7e0de0d2 100644 --- a/morango/models/core.py +++ b/morango/models/core.py @@ -615,18 +615,28 @@ def update_fsics(cls, fsics, sync_filter, v2_format=False): ) @classmethod - def get_instance_counters_for_partitions(cls, partitions, only_if_in_store=True): + def get_instance_counters_for_partitions(cls, partitions, is_producer=False): queryset = cls.objects.filter(partition__in=partitions) - if only_if_in_store: - matching_store_instances = Store.objects.filter( + + # filter out instance_ids that are no longer relevant + if ( + is_producer + ): # for the producing side, we only need to use instance_ids that are still on Store records + matching_instances = Store.objects.filter( partition__startswith=models.OuterRef("partition"), last_saved_instance=models.OuterRef("instance_id"), ).values("last_saved_instance") - # manually add EXISTS clause to avoid Django bug which puts `= TRUE` comparison that kills postgres perf - exists = models.Exists(matching_store_instances).resolve_expression( - query=queryset.query, allow_joins=True, reuse=None - ) - queryset.query.where.add(exists, "AND") + else: # for the receiving side, we include all instances that still exist in the RecordMaxCounters + matching_instances = RecordMaxCounter.objects.filter( + store_model__partition__startswith=models.OuterRef("partition"), + instance_id=models.OuterRef("instance_id"), + ).values("instance_id") + + # manually add EXISTS clause to avoid Django bug which puts `= TRUE` comparison that kills postgres perf + exists = models.Exists(matching_instances).resolve_expression( + query=queryset.query, allow_joins=True, reuse=None + ) + queryset.query.where.add(exists, "AND") queryset = queryset.values_list("partition", "instance_id", "counter") @@ -643,7 +653,6 @@ def calculate_filter_specific_instance_counters( filters, is_producer=False, v2_format=False, - only_if_in_store=True, ): if v2_format: @@ -675,12 +684,12 @@ def calculate_filter_specific_instance_counters( qs.values_list("partition", flat=True).distinct() ) - # get the instance counters for the partitions, filtered (if requested) by instance_ids still used in the store + # get the instance counters for the partitions, also filtering out old unnecessary instance_ids super_fsics = cls.get_instance_counters_for_partitions( - super_partitions, only_if_in_store=only_if_in_store + super_partitions, is_producer=is_producer ) sub_fsics = cls.get_instance_counters_for_partitions( - sub_partitions, only_if_in_store=only_if_in_store + sub_partitions, is_producer=is_producer ) raw_fsic = { diff --git a/tests/testapp/tests/integration/test_syncsession.py b/tests/testapp/tests/integration/test_syncsession.py index 146fd2b7..d6cb4fbc 100644 --- a/tests/testapp/tests/integration/test_syncsession.py +++ b/tests/testapp/tests/integration/test_syncsession.py @@ -266,6 +266,39 @@ def test_full_flow_and_repeat(self): transfer_session = second_pull_client.local_context.transfer_session self.assertEqual(0, transfer_session.records_total) + def test_second_pull_with_instance_id_no_longer_in_store(self): + with second_environment(): + SummaryLog.objects.create(user=self.remote_user) + summ_log_id = SummaryLog.objects.first().id + + self.assertEqual(0, SummaryLog.objects.filter(id=summ_log_id).count()) + + # first pull + pull_client = self.client.get_pull_client() + pull_client.initialize(self.filter) + transfer_session = pull_client.local_context.transfer_session + self.assertEqual(1, transfer_session.records_total) + self.assertEqual(0, transfer_session.records_transferred) + pull_client.run() + self.assertEqual(1, transfer_session.records_transferred) + pull_client.finalize() + + # sanity check pull worked + self.assertEqual(1, SummaryLog.objects.filter(id=summ_log_id).count()) + + # update the log record locally + SummaryLog.objects.filter(id=summ_log_id).update(content_id="a" * 32) + + # now start a push, to serialize the local record, but don't actually push + second_push_client = self.client.get_push_client() + second_push_client.initialize(self.filter) + + # now do another pull, which shouldn't have anything new to bring down + second_pull_client = self.client.get_pull_client() + second_pull_client.initialize(self.filter) + transfer_session = second_pull_client.local_context.transfer_session + self.assertEqual(0, transfer_session.records_total) + def test_resume(self): # create data for _ in range(5): From 5be6f0a43541bd6758733403a9960aecac6c6cd1 Mon Sep 17 00:00:00 2001 From: Jamie Alexandre Date: Mon, 31 Jan 2022 10:27:58 -0800 Subject: [PATCH 8/8] Bump to new 0.6.8 alpha version --- morango/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/morango/__init__.py b/morango/__init__.py index 5c882840..7377bd74 100644 --- a/morango/__init__.py +++ b/morango/__init__.py @@ -3,4 +3,4 @@ from __future__ import unicode_literals default_app_config = "morango.apps.MorangoConfig" -__version__ = "0.6.7" +__version__ = "0.6.8a0"