Skip to content

Commit

Permalink
Remove logic for increasing the batch size
Browse files Browse the repository at this point in the history
a
  • Loading branch information
drew2a committed Jan 31, 2022
1 parent 3800707 commit d8244a2
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ def get_entries_count(self, **kwargs):
return self.get_entries_query(**kwargs).count()

@db_session
def get_max_rowid(self):
def get_max_rowid(self) -> int:
return select(max(obj.rowid) for obj in self.ChannelNode).get() or 0

fts_keyword_search_re = re.compile(r'\w+', re.UNICODE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@
import tribler_core.components.metadata_store.db.orm_bindings.torrent_metadata as torrent_metadata
import tribler_core.components.metadata_store.db.store as MDS
from tribler_core.components.metadata_store.db.serialization import REGULAR_TORRENT
from tribler_core.components.tag.db.tag_db import (
TagDatabase,
)
from tribler_core.components.tag.db.tag_db import TagDatabase
from tribler_core.components.tag.rules.tag_rules import extract_only_valid_tags
from tribler_core.notifier import Notifier

Expand Down Expand Up @@ -40,9 +38,18 @@ def __init__(self, notifier: Notifier, db: TagDatabase, mds: MDS.MetadataStore,
self.interval = interval
self.notifier.add_observer(torrent_metadata.NEW_TORRENT_METADATA_CREATED,
callback=self.process_torrent_title)
self.register_task(name=self.process_batch.__name__,
interval=interval,
task=self.process_batch)
@db_session
def start(self):
self.logger.info(f'Start')

max_row_id = self.mds.get_max_rowid()
is_finished = self.get_last_processed_torrent_id() >= max_row_id

if not is_finished:
self.logger.info(f'Register process_batch task with interval: {self.interval} sec')
self.register_task(name=self.process_batch.__name__,
interval=self.interval,
task=self.process_batch)

@db_session
def process_batch(self) -> int:
Expand All @@ -51,8 +58,9 @@ def query(_start, _end):
t.metadata_type == REGULAR_TORRENT and \
t.tag_version < self.version

start = int(self.mds.get_value(LAST_PROCESSED_TORRENT_ID, default='0'))
end = start + self.batch_size
start = self.get_last_processed_torrent_id()
max_row_id = self.mds.get_max_rowid()
end = min(start + self.batch_size, max_row_id)
self.logger.info(f'Processing batch [{start}...{end}]')

batch = self.mds.TorrentMetadata.select(query(start, end))
Expand All @@ -63,14 +71,13 @@ def query(_start, _end):
torrent.tag_version = self.version
processed += 1

self.mds.set_value(LAST_PROCESSED_TORRENT_ID, str(end))
self.logger.info(f'Processed: {processed} titles. Added {added} tags.')
max_row_id = self.mds.get_max_rowid()

is_beyond_the_boundary = end > max_row_id
if is_beyond_the_boundary:
self._schedule_new_process_batch_round()
else:
self.mds.set_value(LAST_PROCESSED_TORRENT_ID, str(end))
is_finished = end >= max_row_id
if is_finished:
self.logger.info(f'Finish batch processing, cancel process_batch task')
self.cancel_pending_task(name=self.process_batch.__name__)
return processed

def process_torrent_title(self, infohash: Optional[bytes] = None, title: Optional[str] = None) -> int:
Expand All @@ -87,14 +94,5 @@ def save_tags(self, infohash: bytes, tags: Set[str]):
for tag in tags:
self.db.add_auto_generated_tag(infohash=infohash, tag=tag)

def _schedule_new_process_batch_round(self):
self.logger.info('All items in TorrentMetadata have been processed.')
self.mds.set_value(LAST_PROCESSED_TORRENT_ID, '0')
self.logger.info('Set last_processed_torrent_id to 0')
self.interval *= 2
self.logger.info(f'Double the interval. New interval: {self.interval}')
self.batch_size *= 2
self.logger.info(f'Double the batch size. New batch size: {self.batch_size}')
self.replace_task(self.process_batch.__name__,
interval=self.interval,
task=self.process_batch)
def get_last_processed_torrent_id(self) -> int:
return int(self.mds.get_value(LAST_PROCESSED_TORRENT_ID, default='0'))
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
import pytest

from tribler_core.components.metadata_store.db.orm_bindings.torrent_metadata import NEW_TORRENT_METADATA_CREATED
from tribler_core.components.tag.community.tag_payload import TagOperation, TagOperationEnum
from tribler_core.components.tag.db.tag_db import CLOCK_START_VALUE, \
PUBLIC_KEY_FOR_AUTO_GENERATED_TAGS
from tribler_core.components.tag.rules.tag_rules_processor import LAST_PROCESSED_TORRENT_ID, TagRulesProcessor

TEST_BATCH_SIZE = 100
Expand Down Expand Up @@ -55,15 +52,6 @@ def test_save_tags(tag_rules_processor: TagRulesProcessor):
assert [c for c in actual_calls if c not in expected_calls] == []


@patch.object(TagRulesProcessor, 'replace_task')
def test_schedule_new_process_batch_round(mocked_replace_task: Mock, tag_rules_processor: TagRulesProcessor):
tag_rules_processor._schedule_new_process_batch_round()
assert tag_rules_processor.interval == TEST_INTERVAL * 2
assert tag_rules_processor.batch_size == TEST_BATCH_SIZE * 2
tag_rules_processor.mds.set_value.assert_called_with(LAST_PROCESSED_TORRENT_ID, '0')
mocked_replace_task.assert_called_once()


@patch.object(TagRulesProcessor, 'process_torrent_title', new=Mock(return_value=1))
def test_process_batch_within_the_boundary(tag_rules_processor: TagRulesProcessor):
# test inner logic of `process_batch` in case this batch located within the boundary
Expand All @@ -85,22 +73,22 @@ def select(_):
tag_rules_processor.mds.set_value.assert_called_with(LAST_PROCESSED_TORRENT_ID, str(TEST_BATCH_SIZE))


@patch.object(TagRulesProcessor, '_schedule_new_process_batch_round')
@patch.object(TagRulesProcessor, 'process_torrent_title', new=Mock(return_value=1))
def test_process_batch_beyond_the_boundary(mocked_schedule_new_process_batch_round: Mock,
tag_rules_processor: TagRulesProcessor):
# test inner logic of `process_batch` in case this batch located within the boundary
def test_process_batch_beyond_the_boundary(tag_rules_processor: TagRulesProcessor):
# test inner logic of `process_batch` in case this batch located on a border
returned_batch_size = TEST_BATCH_SIZE // 2 # let's return a half of requested items

# let's specify `max_rowid` in such a way that it is less than end of the current batch
max_rowid = returned_batch_size // 2

def select(_):
return [SimpleNamespace(infohash=i, title=i) for i in range(returned_batch_size)]

tag_rules_processor.mds.get_value = lambda *_, **__: 0 # let's start from 0 for LAST_PROCESSED_TORRENT_ID
tag_rules_processor.mds.TorrentMetadata.select = select

# let's specify `max_rowid` in such a way that it is less than end of the current batch
tag_rules_processor.mds.get_max_rowid = lambda: returned_batch_size // 2
tag_rules_processor.mds.get_max_rowid = lambda: max_rowid

# assert that actually returned count of processed items is equal to `returned_batch_size`
# assert that actually returned count of processed items is equal to `max_rowid`
assert tag_rules_processor.process_batch() == returned_batch_size
mocked_schedule_new_process_batch_round.assert_called_once()
tag_rules_processor.mds.set_value.assert_called_with(LAST_PROCESSED_TORRENT_ID, str(max_rowid))
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ async def run(self):
db=self.tags_db,
mds=mds_component.mds,
)
self.rules_processor.start()

self._ipv8_component.initialise_community_by_default(self.community)

Expand Down

0 comments on commit d8244a2

Please sign in to comment.