Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update fix/clean up #4062

Merged
merged 5 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/analytics/api/models/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,7 +589,7 @@ def simple_data_cleaning(cls, data: pd.DataFrame) -> pd.DataFrame:

data.loc[data["pm2_5_raw_value"] != 0, "pm2_5"] = np.nan

if (data["pm2_5_raw_value"] == 0).all():
if ((data["pm2_5_raw_value"] == 0) | (data["pm2_5_raw_value"].isna())).all():
data.drop(columns=["pm2_5_raw_value"], inplace=True)

zero_columns = data.loc[:, (data == 0).all()].columns
Expand Down
36 changes: 33 additions & 3 deletions src/workflows/airqo_etl_utils/bigquery_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,22 +604,52 @@ def reload_data(
self,
dataframe: pd.DataFrame,
table: str,
tenant: Tenant = Tenant.ALL,
network: str = "all",
start_date_time: str = None,
end_date_time: str = None,
where_fields: dict = None,
null_cols: list = None,
) -> None:
"""
Reloads data into a specified table in BigQuery by:
1. Deleting existing records in the table based on the provided date range,
network, and optional filtering criteria.
2. Inserting new records from the provided DataFrame.

Args:
dataframe (pd.DataFrame): The data to be reloaded into the table.
table (str): The target table in BigQuery.
network (str, optional): The network filter to be applied. Defaults to "all".
start_date_time (str, optional): The start of the date range for deletion.
If None, inferred from the DataFrame's earliest timestamp.
end_date_time (str, optional): The end of the date range for deletion.
If None, inferred from the DataFrame's latest timestamp.
where_fields (dict, optional): Additional fields and values for filtering rows to delete.
null_cols (list, optional): Columns to filter on `NULL` values during deletion.

Returns:
None: The function performs operations directly on the BigQuery table.

Raises:
ValueError: If `timestamp` column is missing in the DataFrame.
"""

if start_date_time is None or end_date_time is None:
data = dataframe.copy()
if "timestamp" not in dataframe.columns:
raise ValueError(
"The DataFrame must contain a 'timestamp' column to derive the date range."
)
data = (
dataframe.copy()
) # Not sure why this dataframe is being copied. # Memory wastage?
Comment on lines +643 to +644
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Clarify the purpose of copying the DataFrame or remove the unnecessary copy

The comment # Not sure why this dataframe is being copied. # Memory wastage? suggests uncertainty about the necessity of copying the DataFrame. If the copy is not required, consider removing data = dataframe.copy() to conserve memory. If it is necessary, please provide an explanation to clarify its purpose for future reference.

data["timestamp"] = pd.to_datetime(data["timestamp"])
start_date_time = date_to_str(data["timestamp"].min())
end_date_time = date_to_str(data["timestamp"].max())

query = self.compose_query(
QueryType.DELETE,
table=table,
tenant=tenant,
network=network,
start_date_time=start_date_time,
end_date_time=end_date_time,
where_fields=where_fields,
Expand Down
68 changes: 42 additions & 26 deletions src/workflows/airqo_etl_utils/daily_data_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,31 +8,47 @@
class DailyDataUtils:
@staticmethod
def average_data(data: pd.DataFrame) -> pd.DataFrame:
averaged_data = pd.DataFrame()
data["timestamp"] = data["timestamp"].apply(pd.to_datetime)

for _, by_tenant in data.groupby("tenant"):
tenant = by_tenant.iloc[0]["tenant"]
del by_tenant["tenant"]
for _, by_device in by_tenant.groupby("device_id"):
site_id = by_device.iloc[0]["site_id"]
device_id = by_device.iloc[0]["device_id"]
device_number = by_device.iloc[0]["device_number"]

del by_device["site_id"]
del by_device["device_id"]
del by_device["device_number"]

device_averages = by_device.resample("1D", on="timestamp").mean()
device_averages["timestamp"] = device_averages.index
device_averages["device_id"] = device_id
device_averages["site_id"] = site_id
device_averages["device_number"] = device_number
device_averages["tenant"] = tenant

averaged_data = pd.concat(
[averaged_data, device_averages], ignore_index=True
)
"""
Averages data in a pandas DataFrame on a daily basis for each device,
grouped by network and device ID. The function resamples data
to compute daily averages for numerical columns.

Args:
data (pd.DataFrame): A pandas DataFrame containing the following columns:
- "timestamp": Timestamps of the data.
- "network": The network the data belongs to.
- "device_id": Unique identifier for the device.
- "site_id": Unique identifier for the site associated with the device.
- "device_number": Device number.

Returns:
pd.DataFrame: A DataFrame containing daily averages for each device,
including metadata columns such as "tenant", "device_id", "site_id",
and "device_number".
"""
Comment on lines +11 to +28
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Update docstring to replace 'tenant' with 'network' for consistency

In the Returns section of the docstring for average_data, it mentions metadata columns such as "tenant", "device_id", "site_id", and "device_number". However, the code uses "network" instead of "tenant". Please update the docstring to reflect "network" to maintain consistency and avoid confusion.

data["timestamp"] = pd.to_datetime(data["timestamp"])

averaged_data_list = []

for (network, device_id), group in data.groupby(["network", "device_id"]):
network = group["network"].iloc[0]
site_id = group["site_id"].iloc[0]
device_number = group["device_number"].iloc[0]

device_averages = (
group.resample("1D", on="timestamp")
.mean(numeric_only=True)
.reset_index()
)

device_averages["network"] = network
device_averages["device_id"] = device_id
device_averages["site_id"] = site_id
device_averages["device_number"] = device_number

averaged_data_list.append(device_averages)

averaged_data = pd.concat(averaged_data_list, ignore_index=True)

return averaged_data

Expand Down Expand Up @@ -77,7 +93,7 @@ def cleanup_and_reload(
)

bigquery_api.reload_data(
tenant=Tenant.ALL,
network="all",
table=table,
dataframe=data,
start_date_time=start_date_time,
Expand Down
Loading