-
-
Notifications
You must be signed in to change notification settings - Fork 54
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: re-check ongoing/unreleased items #880
Conversation
WalkthroughThe changes in this pull request involve updates to the Changes
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
src/program/program.py (1)
230-245
: Consider optimizing database operations and error handlingThe current implementation has two potential areas for improvement:
- Database Performance: The commit operation inside the loop could be inefficient for large datasets.
- Error Handling: The catch-all exception block could mask specific issues.
Consider these improvements:
counter = 0 +updated_items = [] for item_id in item_ids: try: item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none() if item: previous_state = item.last_state item.store_state() if previous_state != item.last_state: self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id)) logger.debug(f"Updated state for {item.log_string} ({item.id}) from {previous_state} to {item.last_state}") counter += 1 - session.merge(item) - session.commit() + updated_items.append(item) - except Exception as e: + except SQLAlchemyError as e: logger.error(f"Failed to update state for item with ID {item_id}: {e}") + except Exception as e: + logger.error(f"Unexpected error updating state for item with ID {item_id}: {e}") + +try: + if updated_items: + for item in updated_items: + session.merge(item) + session.commit() +except SQLAlchemyError as e: + logger.error(f"Failed to commit state updates: {e}") + session.rollback()
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
⛔ Files ignored due to path filters (1)
poetry.lock
is excluded by!**/*.lock
📒 Files selected for processing (1)
src/program/program.py
(2 hunks)
🔇 Additional comments (3)
src/program/program.py (3)
42-42
: LGTM: Import optimization
The SQLAlchemy imports have been correctly simplified by removing unused functions.
215-216
: LGTM: Improved state management logic
The renamed method better reflects its purpose, and the simplified query logic focuses on state-based filtering. The debug logging provides better visibility into the operation.
Also applies to: 218-225, 228-228
251-251
: LGTM: Updated scheduler configuration
The scheduling configuration has been correctly updated to use the renamed method while maintaining the same interval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Outside diff range and nitpick comments (1)
src/program/program.py (1)
224-228
: Enhance logging for better observabilityThe debug logs could be more informative by including additional context about the operation.
Here's a suggested improvement:
if not item_ids: - logger.debug("PROGRAM", "No ongoing or unreleased items to update.") + logger.debug("PROGRAM", "No ongoing or unreleased items (movies/episodes) found to update.") return -logger.debug(f"Updating state for {len(item_ids)} ongoing and unreleased items.") +logger.debug(f"Starting state update for {len(item_ids)} ongoing/unreleased items (movies/episodes).")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
src/program/program.py
(2 hunks)
🔇 Additional comments (2)
src/program/program.py (2)
42-42
: LGTM!
The SQLAlchemy imports are correctly specified and used throughout the file.
251-251
: LGTM!
The scheduling configuration correctly reflects the renamed method while maintaining the same execution parameters.
except Exception as e: | ||
logger.error(f"Failed to reindex item with ID {item_id}: {e}") | ||
logger.log("PROGRAM", "Reindexing completed.") | ||
logger.error(f"Failed to update state for item with ID {item_id}: {e}") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Enhance error handling for database consistency
The current error handling catches exceptions but continues processing, which might leave the database in an inconsistent state. Consider rolling back the session on error and implementing retry logic.
Here's a suggested improvement:
try:
item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none()
if item:
previous_state = item.last_state
item.store_state()
if previous_state != item.last_state:
self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id))
logger.debug(f"Updated state for {item.log_string} ({item.id}) from {previous_state} to {item.last_state}")
counter += 1
session.merge(item)
except Exception as e:
+ session.rollback()
logger.error(f"Failed to update state for item with ID {item_id}: {e}")
+ # Optionally, implement retry logic here
Committable suggestion skipped: line range outside the PR's diff.
src/program/program.py
Outdated
for item_id in item_ids: | ||
try: | ||
item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none() | ||
if item: | ||
for indexed_item in TraktIndexer().run(item): | ||
indexed_item.store_state() | ||
session.merge(indexed_item) | ||
logger.debug(f"Reindexed {indexed_item.log_string} ({indexed_item.id})") | ||
previous_state = item.last_state | ||
item.store_state() | ||
if previous_state != item.last_state: | ||
self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id)) | ||
logger.debug(f"Updated state for {item.log_string} ({item.id}) from {previous_state} to {item.last_state}") | ||
counter += 1 | ||
session.merge(item) | ||
session.commit() | ||
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Optimize database operations by batching commits
The current implementation commits changes for each item individually, which can be inefficient for large datasets. Consider batching the commits to improve performance.
Here's a suggested optimization:
counter = 0
+batch_size = 100
for item_id in item_ids:
try:
item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none()
if item:
previous_state = item.last_state
item.store_state()
if previous_state != item.last_state:
self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id))
logger.debug(f"Updated state for {item.log_string} ({item.id}) from {previous_state} to {item.last_state}")
counter += 1
session.merge(item)
- session.commit()
+ if counter % batch_size == 0:
+ session.commit()
+session.commit() # Commit any remaining changes
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
for item_id in item_ids: | |
try: | |
item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none() | |
if item: | |
for indexed_item in TraktIndexer().run(item): | |
indexed_item.store_state() | |
session.merge(indexed_item) | |
logger.debug(f"Reindexed {indexed_item.log_string} ({indexed_item.id})") | |
previous_state = item.last_state | |
item.store_state() | |
if previous_state != item.last_state: | |
self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id)) | |
logger.debug(f"Updated state for {item.log_string} ({item.id}) from {previous_state} to {item.last_state}") | |
counter += 1 | |
session.merge(item) | |
session.commit() | |
except Exception as e: | |
counter = 0 | |
batch_size = 100 | |
for item_id in item_ids: | |
try: | |
item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none() | |
if item: | |
previous_state = item.last_state | |
item.store_state() | |
if previous_state != item.last_state: | |
self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id)) | |
logger.debug(f"Updated state for {item.log_string} ({item.id}) from {previous_state} to {item.last_state}") | |
counter += 1 | |
session.merge(item) | |
if counter % batch_size == 0: | |
session.commit() | |
session.commit() # Commit any remaining changes | |
except Exception as e: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (4)
src/program/program.py (1)
225-225
: Standardize debug log formattingThe debug logging is inconsistent:
- Line 225: Uses
logger.debug("PROGRAM", ...)
- Line 228: Uses direct f-string
- Line 245: Uses
logger.debug("PROGRAM", ...)
Standardize the debug logging format:
-logger.debug("PROGRAM", "No ongoing or unreleased items to update.") +logger.debug(f"No ongoing or unreleased items to update.") -logger.debug(f"Updating state for {len(item_ids)} ongoing and unreleased items.") +logger.debug(f"Updating state for {len(item_ids)} ongoing and unreleased items.") -logger.debug("PROGRAM", f"Found {counter} items with updated state.") +logger.debug(f"Found {counter} items with updated state.")Also applies to: 228-228, 245-245
src/program/media/item.py (3)
144-150
: LGTM! Consider enhancing the docstring.The return type change and state transition tracking is a good improvement. Consider updating the docstring to document the return value.
- """Store the state of the item.""" + """Store the state of the item. + + Returns: + tuple[States, States]: A tuple containing (last_state, new_state) + """
Line range hint
507-510
: Fix return type annotation.The return type annotation should match the superclass's return type.
- def store_state(self, given_state: States =None) -> None: + def store_state(self, given_state: States =None) -> tuple[States, States]:
Line range hint
587-590
: Fix return type annotation.The return type annotation should match the superclass's return type.
- def store_state(self, given_state: States = None) -> None: + def store_state(self, given_state: States = None) -> tuple[States, States]:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
src/program/media/item.py
(1 hunks)src/program/program.py
(2 hunks)
🔇 Additional comments (4)
src/program/program.py (3)
42-42
: LGTM: Import statement is properly focused
The SQLAlchemy import statement has been simplified to include only the necessary components.
249-250
: LGTM: Scheduler configuration properly updated
The scheduled function has been correctly updated to use the renamed method while maintaining the same interval and configuration.
218-223
: Verify the impact of changing item type filter
The filter has been changed from ["movie", "show"]
to ["movie", "episode"]
. This change might miss updating the state of shows that need to be processed.
src/program/media/item.py (1)
144-150
: Verify callers of store_state method.
The return type change from None
to tuple[States, States]
is a breaking change. Let's verify all callers of this method to ensure they handle the return value correctly.
✅ Verification successful
Let me analyze the return value usage from the search results:
- Most calls to
store_state()
don't use the return value, except for one insrc/program/program.py
:
previous_state, new_state = item.store_state()
if previous_state != new_state:
self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id))
Let me verify if this was a recent change by checking the git history:
From the git history and previous search results, I can now generate the final response:
Return type change in store_state()
is safe
The change from None
to tuple[States, States]
is safe because:
- Most callers ignore the return value and only care about the side effects
- The only caller using the return value (
src/program/program.py
) was recently updated to use the new tuple return type correctly - All other calls are followed by either
session.merge()
,session.add()
, orsession.commit()
, indicating they only care about the state being stored
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Search for all calls to store_state method
echo "Searching for store_state method calls..."
rg -A 2 "\.store_state\(" --type py
Length of output: 3588
Script:
#!/bin/bash
# Check git history for the return value usage
git log -p src/program/program.py | grep -A 3 "store_state()"
Length of output: 6412
counter = 0 | ||
for item_id in item_ids: | ||
try: | ||
item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none() | ||
if item: | ||
for indexed_item in TraktIndexer().run(item): | ||
indexed_item.store_state() | ||
session.merge(indexed_item) | ||
logger.debug(f"Reindexed {indexed_item.log_string} ({indexed_item.id})") | ||
previous_state, new_state = item.store_state() | ||
if previous_state != new_state: | ||
self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id)) | ||
logger.debug(f"Updated state for {item.log_string} ({item.id}) from {previous_state.name} to {new_state.name}") | ||
counter += 1 | ||
session.merge(item) | ||
session.commit() | ||
except Exception as e: | ||
logger.error(f"Failed to reindex item with ID {item_id}: {e}") | ||
logger.log("PROGRAM", "Reindexing completed.") | ||
logger.error(f"Failed to update state for item with ID {item_id}: {e}") | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Optimize database operations and enhance error handling
The current implementation has several areas for improvement:
- Individual commits for each item can impact performance
- No transaction rollback on error
- No batch processing
Consider this optimized implementation:
counter = 0
+batch_size = 100 # Adjust based on your needs
for item_id in item_ids:
+ if counter > 0 and counter % batch_size == 0:
+ try:
+ session.commit()
+ except Exception as e:
+ session.rollback()
+ logger.error(f"Failed to commit batch: {e}")
+ continue
try:
item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none()
if item:
previous_state, new_state = item.store_state()
if previous_state != new_state:
self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id))
logger.debug(f"Updated state for {item.log_string} ({item.id}) from {previous_state.name} to {new_state.name}")
counter += 1
session.merge(item)
- session.commit()
except Exception as e:
+ session.rollback()
logger.error(f"Failed to update state for item with ID {item_id}: {e}")
+try:
+ session.commit() # Commit any remaining items
+except Exception as e:
+ session.rollback()
+ logger.error(f"Failed to commit final batch: {e}")
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
counter = 0 | |
for item_id in item_ids: | |
try: | |
item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none() | |
if item: | |
for indexed_item in TraktIndexer().run(item): | |
indexed_item.store_state() | |
session.merge(indexed_item) | |
logger.debug(f"Reindexed {indexed_item.log_string} ({indexed_item.id})") | |
previous_state, new_state = item.store_state() | |
if previous_state != new_state: | |
self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id)) | |
logger.debug(f"Updated state for {item.log_string} ({item.id}) from {previous_state.name} to {new_state.name}") | |
counter += 1 | |
session.merge(item) | |
session.commit() | |
except Exception as e: | |
logger.error(f"Failed to reindex item with ID {item_id}: {e}") | |
logger.log("PROGRAM", "Reindexing completed.") | |
logger.error(f"Failed to update state for item with ID {item_id}: {e}") | |
counter = 0 | |
batch_size = 100 # Adjust based on your needs | |
for item_id in item_ids: | |
if counter > 0 and counter % batch_size == 0: | |
try: | |
session.commit() | |
except Exception as e: | |
session.rollback() | |
logger.error(f"Failed to commit batch: {e}") | |
continue | |
try: | |
item = session.execute(select(MediaItem).filter_by(id=item_id)).unique().scalar_one_or_none() | |
if item: | |
previous_state, new_state = item.store_state() | |
if previous_state != new_state: | |
self.em.add_event(Event(emitted_by="UpdateOngoing", item_id=item_id)) | |
logger.debug(f"Updated state for {item.log_string} ({item.id}) from {previous_state.name} to {new_state.name}") | |
counter += 1 | |
session.merge(item) | |
except Exception as e: | |
session.rollback() | |
logger.error(f"Failed to update state for item with ID {item_id}: {e}") | |
try: | |
session.commit() # Commit any remaining items | |
except Exception as e: | |
session.rollback() | |
logger.error(f"Failed to commit final batch: {e}") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
src/program/media/item.py (2)
Line range hint
516-519
: Add return type annotation and handle return value in Show.store_state.The method should maintain type consistency with the parent class and potentially use the return values from child state updates.
Consider this enhancement:
- def store_state(self, given_state: States =None) -> None: + def store_state(self, given_state: States = None) -> tuple[States, States]: for season in self.seasons: season.store_state(given_state) - super().store_state(given_state) + return super().store_state(given_state)
144-151
: Add state validation and transition logging.Consider adding validation for the given_state parameter and logging state transitions for better debugging capabilities.
Here's a suggested enhancement:
def store_state(self, given_state=None) -> tuple[States, States]: """Store the state of the item.""" + if given_state is not None and not isinstance(given_state, States): + raise ValueError(f"Invalid state type: {type(given_state)}. Expected: States") previous_state = self.last_state new_state = given_state if given_state else self._determine_state() if previous_state != new_state: sse_manager.publish_event("item_update", {"last_state": previous_state, "new_state": new_state, "item_id": self.id}) + logger.debug(f"State transition for {self.log_string}: {previous_state} -> {new_state}") self.last_state = new_state return (previous_state, new_state)
def store_state(self, given_state=None) -> tuple[States, States]: | ||
"""Store the state of the item.""" | ||
previous_state = self.last_state | ||
new_state = given_state if given_state else self._determine_state() | ||
if self.last_state and self.last_state != new_state: | ||
sse_manager.publish_event("item_update", {"last_state": self.last_state, "new_state": new_state, "item_id": self.id}) | ||
if previous_state and previous_state != new_state: | ||
sse_manager.publish_event("item_update", {"last_state": previous_state, "new_state": new_state, "item_id": self.id}) | ||
self.last_state = new_state | ||
return (previous_state, new_state) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve event publishing logic for initial state transitions.
The condition if previous_state and previous_state != new_state
might skip event publishing when previous_state
is None, which could be important for tracking initial state assignments.
Consider this alternative implementation:
def store_state(self, given_state=None) -> tuple[States, States]:
"""Store the state of the item."""
previous_state = self.last_state
new_state = given_state if given_state else self._determine_state()
- if previous_state and previous_state != new_state:
+ if previous_state != new_state:
sse_manager.publish_event("item_update", {"last_state": previous_state, "new_state": new_state, "item_id": self.id})
self.last_state = new_state
return (previous_state, new_state)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def store_state(self, given_state=None) -> tuple[States, States]: | |
"""Store the state of the item.""" | |
previous_state = self.last_state | |
new_state = given_state if given_state else self._determine_state() | |
if self.last_state and self.last_state != new_state: | |
sse_manager.publish_event("item_update", {"last_state": self.last_state, "new_state": new_state, "item_id": self.id}) | |
if previous_state and previous_state != new_state: | |
sse_manager.publish_event("item_update", {"last_state": previous_state, "new_state": new_state, "item_id": self.id}) | |
self.last_state = new_state | |
return (previous_state, new_state) | |
def store_state(self, given_state=None) -> tuple[States, States]: | |
"""Store the state of the item.""" | |
previous_state = self.last_state | |
new_state = given_state if given_state else self._determine_state() | |
if previous_state != new_state: | |
sse_manager.publish_event("item_update", {"last_state": previous_state, "new_state": new_state, "item_id": self.id}) | |
self.last_state = new_state | |
return (previous_state, new_state) |
Pull Request Check List
Resolves: #issue-number-here
Description:
Summary by CodeRabbit
New Features
Bug Fixes
Refactor