Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

setup job for pollution predictions using satellite data #3343

Merged
merged 26 commits into from
Oct 11, 2024

Conversation

A7med7x7
Copy link
Contributor

@A7med7x7 A7med7x7 commented Aug 22, 2024

WHAT DOES THIS PR DO?

  • Set up DAG to make PM2.5 predictions using satellite data
  • Some code cleanup and refactoring existing ML code

HOW DO I TEST OUT THIS PR?

ARE THERE ANY RELATED PRs?

  • Related PR 1
  • Related PR 2

Summary by CodeRabbit

  • New Features

    • Enhanced satellite data fetching capabilities with new methods.
    • Introduced utilities for machine learning tasks related to forecasting and fault detection.
    • Added new DAGs for improved data processing and model training.
  • Bug Fixes

    • Updated test cases to align with recent changes and ensure functionality.
  • Documentation

    • Improved clarity in method naming and configuration settings for better user understanding.

Copy link
Contributor

coderabbitai bot commented Aug 22, 2024

📝 Walkthrough
📝 Walkthrough

Walkthrough

The changes in this pull request introduce enhancements to the BigQueryApi class, configuration settings, machine learning utilities, and DAGs within the AirQo ETL framework. New methods for fetching satellite data are added, existing methods are renamed for clarity, and new utility classes are introduced. Additionally, several new DAGs are created for forecasting and satellite model training, while redundant files are removed. Test cases have been updated to reflect these changes, ensuring consistency across the codebase.

Changes

File/Path Change Summary
src/workflows/airqo_etl_utils/bigquery_api.py Added methods for fetching satellite data, renamed existing method, added new attribute.
src/workflows/airqo_etl_utils/config.py Introduced new environment variables for satellite data configuration.
src/workflows/airqo_etl_utils/ml_utils.py Renamed MlUtils to BaseMlUtils, added new utility classes and methods for feature extraction.
src/workflows/airqo_etl_utils/satellite_utils.py Introduced SatelliteMLUtils with methods for preprocessing and training satellite models.
src/workflows/airqo_etl_utils/tests/test_big_query_api.py Updated test cases for method renaming and restructuring.
src/workflows/airqo_etl_utils/tests/test_ml_utils.py Updated test cases for new utility classes and methods.
src/workflows/dags/fault_detection_job.py Updated DAG to utilize FaultDetectionUtils.
src/workflows/dags/forecast_prediction_jobs.py New DAGs added for forecasting jobs.
src/workflows/dags/forecast_training_jobs.py New DAGs added for training jobs.
src/workflows/dags/ml_prediction_jobs.py Removed old DAG with redundant functionality.
src/workflows/dags/satellite_training.py New DAG and tasks for satellite model training.

Possibly related PRs

Suggested reviewers

  • Baalmart
  • Psalmz777

🌟 In the realm of data, where satellites soar,
New methods and classes, we now can explore.
With configurations set and DAGs in a row,
AirQo's ETL shines, watch it grow!
Testing is updated, all changes aligned,
In this code journey, new paths we find! 🌟


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@Mnoble-19 Mnoble-19 changed the title initial satellite code setup job for pollution predictions using satellite data Sep 17, 2024
@Mnoble-19 Mnoble-19 marked this pull request as ready for review October 11, 2024 08:12
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 23

🧹 Outside diff range and nitpick comments (20)
src/workflows/airqo_etl_utils/tests/test_big_query_api.py (4)

88-88: Excellent method renaming for clarity!

The change from fetch_data to fetch_device_data_for_forecast_job is a great improvement in terms of specificity and clarity. It now clearly indicates the purpose and context of the method being tested.

To further enhance the test's readability, consider updating the test function name to reflect this change:

def test_fetch_device_data_for_forecast_job_correct_scenarios(mock_bigquery_client, start_date_time, expected_df):

This adjustment would make the test name consistent with the method it's testing, improving overall code coherence.


100-100: Consistent method name update in invalid date test.

The change from fetch_data to fetch_device_data_for_forecast_job has been correctly applied in this test case, maintaining consistency with the method rename.

For improved clarity and consistency with the method being tested, consider updating the test function name:

def test_fetch_device_data_for_forecast_job_invalid_date(mock_bigquery_client, start_date_time):

This change would make it immediately clear which method is being tested and under what conditions, enhancing the test suite's readability.


112-112: Method name consistently updated in BigQuery error test.

The change from fetch_data to fetch_device_data_for_forecast_job has been correctly applied in this test case, maintaining consistency across all test functions.

To improve clarity and specificity, consider the following suggestions:

  1. Update the test function name to reflect the specific method and scenario being tested:
def test_fetch_device_data_for_forecast_job_bigquery_error(mock_bigquery_client, start_date_time):
  1. Instead of catching a generic Exception, consider catching a more specific exception that might be raised by BigQuery API errors. This would make the test more robust and informative:
from google.api_core import exceptions as google_exceptions

with pytest.raises(google_exceptions.GoogleAPIError):
    bq_api.fetch_device_data_for_forecast_job(start_date_time, "train")

These changes would enhance the test's readability and make it more precise in terms of the errors it's expecting to handle.


Line range hint 1-124: Overall assessment: Positive changes with room for minor enhancements

The updates to this test file are consistent and improve the clarity of the tests by renaming the fetch_data method to the more specific fetch_device_data_for_forecast_job. This change aligns well with the PR objectives of refactoring and improving code readability.

Key observations:

  1. The method rename is consistently applied across all relevant test functions.
  2. The overall structure and purpose of the tests remain intact, preserving test coverage.
  3. The test_fetch_raw_readings_empty function remains unchanged, which is correct as it tests a different method.

Suggestions for further improvement:

  1. Update test function names to reflect the new method name, enhancing readability and maintainability.
  2. Consider using more specific exception types in error-related tests.

These changes contribute positively to the codebase by making the tests more descriptive and aligned with the actual implementation. The suggestions provided in the individual comments, if implemented, would further enhance the quality and clarity of the test suite.

src/workflows/airqo_etl_utils/tests/test_ml_utils.py (2)

4-4: Excellent update to the import statement.

The change from MlUtils to BaseMlUtils aligns well with the refactoring mentioned in the summary. This update ensures consistency with the main codebase.

As a minor suggestion, consider updating the alias FUtils to BMUtils or BaseMUtils to better reflect the new class name. This would improve code readability and maintain consistency throughout the test file.


Line range hint 7-153: Well-structured and comprehensive test suite.

The test suite demonstrates a thorough approach to validating the BaseMlUtils class functionality. It covers a wide range of scenarios, including typical cases, invalid inputs, and edge cases for various methods such as preprocess_data, get_lag_and_roll_features, and get_time_and_cyclic_features.

To further enhance the test suite, consider adding property-based tests using libraries like Hypothesis. This could help uncover edge cases that might be missed with traditional unit tests, especially for complex data transformations.

Would you like me to provide an example of how to implement a property-based test for one of the methods?

src/workflows/airqo_etl_utils/config.py (2)

38-38: Excellent addition of the BIGQUERY_SATELLITE_DATA_TABLE variable.

The new variable aligns well with the existing configuration pattern and supports the PR objective of incorporating satellite data for pollution predictions. Well done!

As a minor suggestion, consider adding a brief comment above this line to explain the purpose of this table, which could be helpful for future developers working on this configuration.


264-264: Great addition of the SATELLITE_TRAINING_SCOPE variable.

This new variable fits perfectly with the existing configuration structure and supports the PR's goal of setting up satellite data processing for pollution predictions. Excellent work!

For consistency with other similar variables in this section (e.g., HOURLY_FORECAST_TRAINING_JOB_SCOPE), you might consider renaming it to SATELLITE_TRAINING_JOB_SCOPE. This would maintain a uniform naming convention across related variables.

src/workflows/dags/fault_detection_job.py (1)

Line range hint 3-46: Update documentation to reflect changes

With the transition from MlUtils to FaultDetectionUtils, please ensure that all relevant documentation, docstrings, and inline comments are updated to reflect these changes. This will aid in maintaining code clarity and assist other developers in understanding the updated workflow.

src/workflows/dags/satellite_training.py (1)

57-62: Consider removing or updating commented-out code related to 'lag_features_extraction'

The lag_features_extraction function and its invocation are currently commented out. If this code is no longer necessary, consider removing it to clean up the code. If it's intended for future development, adding a TODO comment or proper documentation might clarify its purpose and future plans.

Also applies to: 71-71

src/workflows/airqo_etl_utils/satellilte_utils.py (1)

125-125: Consider removing the debugging print(features) statement

The print(features) statement seems intended for debugging purposes. If it's no longer needed, removing it can help clean up the console output.

src/workflows/dags/forecast_prediction_jobs.py (3)

193-194: Incomplete TODO in formatting_variables function

There's a TODO comment indicating that the formatting_variables function should be modified to take in two datasets (line 193). Currently, it only processes a single dataset.

Would you like assistance in updating this function to handle both the ground monitor data and satellite data? Adjusting this function might improve data preprocessing and integration.


222-225: Redundant function save_hourly_forecasts_to_mongo

The function save_hourly_forecasts_to_mongo (lines 223-225) shares the same name as a function in the make_forecasts DAG. To avoid confusion and potential conflicts, especially when importing modules or if both DAGs are executed in the same context, consider renaming this function to reflect its purpose within the training_job DAG.

For example, rename it to save_predictions_to_mongo:

 @task()
-def save_hourly_forecasts_to_mongo(data):
+def save_predictions_to_mongo(data):
     ForecastUtils.save_forecasts_to_mongo(data, "hourly")

163-169: Missing description for training_job DAG

Providing a description for your DAGs enhances readability and maintainability. The training_job DAG currently lacks a description.

Consider adding a doc_md parameter to the DAG definition to describe its purpose:

@dag(
    "AirQo-satellite-model-prediction-job",
    schedule="0 1 * * *",
    default_args=AirflowUtils.dag_default_configs(),
    tags=["airqo", "hourly-forecast", "daily-forecast", "training-job", "satellite"],
    doc_md="""
    ### AirQo Satellite Model Prediction Job

    This DAG trains a model using historical satellite and ground monitor data to predict air quality parameters.
    """,
)
def training_job():
src/workflows/airqo_etl_utils/ml_utils.py (4)

60-60: Consider improving class abstractions for better code organization.

There's a TODO comment indicating that the current abstractions may need improvement. Enhancing the abstractions can lead to better maintainability and code clarity. Would you like assistance in reviewing and refining the class design?


847-872: Remove or document large blocks of commented-out code.

There are significant sections of commented-out code in this area. Keeping large blocks of commented code can clutter the codebase and hinder readability. Consider removing this code or providing clear documentation on why it is retained.


783-783: Correct the error message to accurately reflect the checked columns.

The error message mentions 'city or country' columns, but only 'city' is being checked. Update the message to reflect the actual column being validated.

Apply this diff to fix the error message:

- raise ValueError("data frame does not contain city or country column")
+ raise ValueError("data frame does not contain 'city' column")

830-830: Ensure consistency in error messages.

The error message uses 'freq' while the parameter name is 'frequency'. Consider updating the message for consistency and clarity.

Apply this diff to fix the error message:

- raise ValueError("freq must be daily or hourly")
+ raise ValueError("frequency must be 'daily' or 'hourly'")
src/workflows/airqo_etl_utils/bigquery_api.py (2)

911-911: Remove redundant empty string in query concatenation.

There's an unnecessary empty string concatenated at the end of the query, which can be safely removed for cleaner code.

Apply this diff to clean up the query construction:

-   911             query += "ORDER BY timestamp" ""
+   911             query += "ORDER BY timestamp"

856-856: Clarify exception messages for invalid date inputs.

Providing more detailed exception messages can aid in debugging. Specify which parameter has the invalid date.

For example:

-   raise ValueError(f"Invalid start date time: {start_date_time}")
+   raise ValueError(f"Invalid start_date_time '{start_date_time}' provided.")

Also applies to: 900-900

🧰 Tools
🪛 Ruff

856-856: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 15d4379 and 93aa3ea.

📒 Files selected for processing (11)
  • src/workflows/airqo_etl_utils/bigquery_api.py (4 hunks)
  • src/workflows/airqo_etl_utils/config.py (2 hunks)
  • src/workflows/airqo_etl_utils/ml_utils.py (9 hunks)
  • src/workflows/airqo_etl_utils/satellilte_utils.py (1 hunks)
  • src/workflows/airqo_etl_utils/tests/test_big_query_api.py (3 hunks)
  • src/workflows/airqo_etl_utils/tests/test_ml_utils.py (1 hunks)
  • src/workflows/dags/fault_detection_job.py (2 hunks)
  • src/workflows/dags/forecast_prediction_jobs.py (1 hunks)
  • src/workflows/dags/forecast_training_jobs.py (4 hunks)
  • src/workflows/dags/ml_prediction_jobs.py (0 hunks)
  • src/workflows/dags/satellite_training.py (1 hunks)
💤 Files with no reviewable changes (1)
  • src/workflows/dags/ml_prediction_jobs.py
🧰 Additional context used
🪛 Ruff
src/workflows/airqo_etl_utils/bigquery_api.py

856-856: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


900-900: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

src/workflows/airqo_etl_utils/ml_utils.py

782-782: Test for membership should be not in

Convert to not in

(E713)


874-874: f-string without any placeholders

Remove extraneous f prefix

(F541)

src/workflows/airqo_etl_utils/satellilte_utils.py

90-90: Local variable e is assigned to but never used

Remove assignment to unused variable e

(F841)


91-93: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)


95-95: Undefined name additional_columns

(F821)


148-148: Local variable train_target is assigned to but never used

Remove assignment to unused variable train_target

(F841)


148-148: Local variable validation_target is assigned to but never used

Remove assignment to unused variable validation_target

(F841)


148-148: Local variable test_target is assigned to but never used

Remove assignment to unused variable test_target

(F841)


154-154: Undefined name optuna

(F821)


155-155: Undefined name optuna

(F821)


158-158: Local variable study is assigned to but never used

Remove assignment to unused variable study

(F841)


158-158: Undefined name optuna

(F821)

src/workflows/dags/forecast_prediction_jobs.py

219-219: Undefined name project_id

(F821)


219-219: Undefined name bucket

(F821)


231-231: Local variable encoded_data is assigned to but never used

Remove assignment to unused variable encoded_data

(F841)


232-232: Local variable time_data is assigned to but never used

Remove assignment to unused variable time_data

(F841)


234-234: Local variable predictions is assigned to but never used

Remove assignment to unused variable predictions

(F841)

src/workflows/dags/forecast_training_jobs.py

89-89: Redefinition of unused get_location_features from line 50

(F811)

src/workflows/dags/satellite_training.py

23-23: datetime.timedelta imported but unused

Remove unused import: datetime.timedelta

(F401)

🔇 Additional comments (17)
src/workflows/airqo_etl_utils/config.py (1)

Line range hint 38-264: Excellent work on improving code readability through indentation corrections.

The adjustments to the indentation of various configuration variables, including BIGQUERY_HOURLY_WEATHER_TABLE, BIGQUERY_OPENWEATHERMAP_TABLE, BIGQUERY_ANALYTICS_TABLE, HOURLY_FORECAST_HORIZON, DAILY_FORECAST_HORIZON, and MLFLOW_TRACKING_URI, significantly enhance the overall code structure and consistency. These changes, while not altering functionality, contribute to a more maintainable and easily readable codebase. Well done on this attention to detail!

src/workflows/dags/fault_detection_job.py (10)

3-3: Updated import to use FaultDetectionUtils

The import statement has been updated to use FaultDetectionUtils, reflecting the shift from MlUtils to the new utility class.


25-27: Addition of get_time_features task

The new task get_time_features appropriately calls FaultDetectionUtils.get_time_features(data, "hourly").


28-30: Addition of get_cyclic_features task

The new task get_cyclic_features correctly invokes FaultDetectionUtils.get_cyclic_features(data, "hourly").


34-34: Updated flag_pattern_based_faults function call

The function call has been updated to use FaultDetectionUtils.flag_pattern_based_faults(data), aligning with the new utility class.


38-38: Updated process_faulty_devices_percentage function call

The call to FaultDetectionUtils.process_faulty_devices_percentage(data) reflects the intended update.


42-42: Updated process_faulty_devices_sequence function call

The function now uses FaultDetectionUtils.process_faulty_devices_fault_sequence(data), which is consistent with the utility update.


50-52: Updated task dependencies to include new feature extraction tasks

The DAG now includes get_time_features and get_cyclic_features tasks. The data flow:

  • raw_dataget_time_featurestime_features
  • time_featuresget_cyclic_featurescyclic_features
  • cyclic_featuresflag_pattern_based_faults

This sequence ensures that time and cyclic features are extracted before pattern-based fault detection, which is appropriate.


46-46: Ensure correct unpacking of data in save_faulty_devices

The save_to_mongo task calls FaultDetectionUtils.save_faulty_devices(*data). Please verify that the save_faulty_devices method accepts the correct number of arguments when using the unpacking operator *data.

#!/bin/bash
# Description: Verify the signature of 'save_faulty_devices' in FaultDetectionUtils

# Expected Result: The method should accept the appropriate parameters to handle the unpacked data.
ast-grep --lang=python --pattern='def save_faulty_devices($_):' src/airqo_etl_utils/ml_utils.py

34-46: Ensure robust error handling in FaultDetectionUtils methods

Please ensure that the methods from FaultDetectionUtils used in these tasks handle exceptions appropriately. This includes methods like flag_pattern_based_faults, process_faulty_devices_percentage, process_faulty_devices_fault_sequence, and save_faulty_devices. Proper error handling will improve the reliability of the DAG.

#!/bin/bash
# Description: Verify that FaultDetectionUtils methods include exception handling

# Expected Result: Methods should contain try-except blocks or handle potential exceptions.
rg --type python -A 5 'def flag_pattern_based_faults' src/airqo_etl_utils/ml_utils.py
rg --type python -A 5 'def process_faulty_devices_percentage' src/airqo_etl_utils/ml_utils.py
rg --type python -A 5 'def process_faulty_devices_fault_sequence' src/airqo_etl_utils/ml_utils.py
rg --type python -A 5 'def save_faulty_devices' src/airqo_etl_utils/ml_utils.py

22-22: Ensure correct implementation of flag_rule_based_faults

The function call to FaultDetectionUtils.flag_rule_based_faults(data) seems appropriate. To confirm that flag_rule_based_faults is correctly defined in FaultDetectionUtils, you can verify its presence in the codebase.

✅ Verification successful

Let's gather more information to verify the presence of flag_rule_based_faults.


Verified: flag_rule_based_faults is correctly implemented in FaultDetectionUtils.

  • Defined in src/workflows/airqo_etl_utils/ml_utils.py
🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that 'flag_rule_based_faults' method exists in FaultDetectionUtils

# Expected Result: The method definition should be found within FaultDetectionUtils class.
ast-grep --lang=python --pattern=$'class FaultDetectionUtils:\n    def flag_rule_based_faults($_):\n        $$$' src/airqo_etl_utils/ml_utils.py

Length of output: 489


Script:

#!/bin/bash
# Description: Locate and verify the 'flag_rule_based_faults' method in the codebase.

# Find all Python files
fd --extension py --type f > python_files.txt

# Search for the method definition in all Python files
while IFS= read -r file; do
    rg -A 5 'def flag_rule_based_faults' "$file"
done < python_files.txt

Length of output: 29768

src/workflows/dags/satellite_training.py (1)

28-28: ⚠️ Potential issue

Avoid redundant import of 'date_to_str'

The date_to_str function is already imported at the top of the file. Re-importing it within the function is unnecessary and can be removed to improve code readability.

Apply this diff to remove the redundant import:

-        from airqo_etl_utils.date import date_to_str

Likely invalid or redundant comment.

src/workflows/dags/forecast_prediction_jobs.py (2)

223-225: Missing task to save predictions

The predictions variable (line 234) is assigned but its value is not used or saved. To complete the pipeline, you might want to save these predictions to MongoDB or another storage.

Please confirm if there's a missing task to save the predictions. If so, consider adding a task like save_predictions_to_mongo(predictions) to handle this.

Here's how you might implement it:

@task()
def save_predictions_to_mongo(data):
    ForecastUtils.save_forecasts_to_mongo(data, "hourly")

# Add the task to the pipeline
save_predictions_to_mongo(predictions)

16-21: Duplicate tags in DAG definitions

Both DAGs are scheduled to run at the same time ("0 1 * * *") and have overlapping tags such as "hourly-forecast" and "daily-forecast". While this might be intentional, it's important to ensure that the DAGs are appropriately distinguished in Airflow for clarity and maintenance.

Please verify if the scheduling and tagging are set as intended. If the DAGs serve different purposes, consider updating the tags to reflect their unique functionalities.

src/workflows/airqo_etl_utils/bigquery_api.py (3)

59-59: Initialization of self.satellite_data_table is appropriate.

The addition of self.satellite_data_table helps manage satellite data configurations effectively.


858-882: SQL query in fetch_device_data_for_satellite_job is well-structured.

The SQL query effectively retrieves and aggregates the necessary data for satellite jobs, including appropriate filtering and grouping.


Line range hint 809-847: fetch_device_data_for_forecast_job method is correctly updated.

The method accurately fetches device data for forecast jobs and includes appropriate joins with the sites table.

src/workflows/dags/fault_detection_job.py Show resolved Hide resolved
default_args=AirflowUtils.dag_default_configs(),
tags=["airqo", "hourly-forecast", "daily-forecast", "training-job", "satellite"],
)
def train_satelllite_model():
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Correct the function name 'train_satelllite_model' to 'train_satellite_model'

There is a typo in the function name train_satelllite_model(); it contains an extra 't' in 'satellite'. This occurs in both the function definition and the invocation. Correcting the spelling will prevent potential confusion and maintain consistency.

Apply this diff to fix the typo:

-def train_satelllite_model():
+def train_satellite_model():
...
-train_satelllite_model()
+train_satellite_model()

Also applies to: 75-75

def train_satelllite_model():
@task()
def fetch_historical_satellite_data():
from datetime import datetime, timedelta, timezone
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Simplify imports by removing unused and redundant imports

In line 23, timedelta is imported but not used, and datetime is already imported at the top of the file. Re-importing datetime within the function is unnecessary. Adjusting the import will enhance code clarity.

Apply this diff to modify the import:

-        from datetime import datetime, timedelta, timezone
+        from datetime import timezone
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
from datetime import datetime, timedelta, timezone
from datetime import timezone
🧰 Tools
🪛 Ruff

23-23: datetime.timedelta imported but unused

Remove unused import: datetime.timedelta

(F401)

src/workflows/dags/satellite_training.py Outdated Show resolved Hide resolved
src/workflows/dags/forecast_training_jobs.py Show resolved Hide resolved
src/workflows/airqo_etl_utils/ml_utils.py Outdated Show resolved Hide resolved
src/workflows/airqo_etl_utils/ml_utils.py Outdated Show resolved Hide resolved
src/workflows/airqo_etl_utils/bigquery_api.py Show resolved Hide resolved
src/workflows/airqo_etl_utils/bigquery_api.py Outdated Show resolved Hide resolved
src/workflows/airqo_etl_utils/bigquery_api.py Outdated Show resolved Hide resolved
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (3)
src/workflows/airqo_etl_utils/bigquery_api.py (3)

853-856: Consider updating exception handling.

As previously suggested, consider updating the exception handling to provide more context:

    try:
        pd.to_datetime(start_date_time)
    except ValueError as e:
-       raise ValueError(f"Invalid start date time: {start_date_time}")
+       raise ValueError(f"Invalid start date time: {start_date_time}") from e

This change will preserve the original traceback and provide more context for debugging.


848-891: LGTM: Well-implemented satellite data retrieval method.

The new fetch_device_data_for_satellite_job method is well-structured and appropriately aggregates data for satellite comparison. The exception handling is correctly implemented, addressing the previous suggestion.

One minor suggestion for improvement:

Consider adding a comment explaining the significance of the specific cities in the query:

# These cities are the primary focus for our satellite data analysis
AND t2.city IN ('Kampala', 'Nairobi', 'Kisumu', 'Lagos', 'Accra', 'Bujumbura', 'Yaounde')

This will help future developers understand the reasoning behind this filter.


892-920: LGTM with suggestions: New method for fetching satellite readings.

The fetch_satellite_readings method is a good addition for retrieving satellite data. However, there are a few points to consider for improvement:

  1. Exception handling: Update the exception handling as previously suggested:

    except ValueError as e:
        raise ValueError(f"Invalid start date time: {start_date_time}") from e
  2. Query optimization: Consider selecting only necessary columns instead of using SELECT DISTINCT * to optimize performance, especially if the satellite data table is large.

  3. Job type handling: Add handling for cases where job_type is not "train". You might want to add an else clause or handle other job types explicitly.

  4. Date filtering: Consider adding a date filter even when job_type is not "train" to avoid fetching the entire table.

Would you like assistance in implementing these suggestions?

🧰 Tools
🪛 Ruff

900-900: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 93aa3ea and b317bd1.

📒 Files selected for processing (1)
  • src/workflows/airqo_etl_utils/bigquery_api.py (4 hunks)
🧰 Additional context used
🪛 Ruff
src/workflows/airqo_etl_utils/bigquery_api.py

900-900: Within an except clause, raise exceptions with raise ... from err or raise ... from None to distinguish them from errors in exception handling

(B904)

🔇 Additional comments (2)
src/workflows/airqo_etl_utils/bigquery_api.py (2)

59-59: LGTM: New attribute for satellite data table.

The addition of self.satellite_data_table is consistent with the class structure and will be useful for the new satellite data-related methods.


Line range hint 809-847: LGTM: Improved method naming and query update.

The renaming of fetch_data to fetch_device_data_for_forecast_job enhances clarity. The update to use self.hourly_measurements_table_prod is noted and assumed to be intentional for production data retrieval.

@Baalmart Baalmart merged commit 276bf9d into airqo-platform:staging Oct 11, 2024
44 checks passed
@Baalmart Baalmart mentioned this pull request Oct 11, 2024
8 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants