Skip to content

Commit

Permalink
Merge 8cdb3c5 into a9e9555
Browse files Browse the repository at this point in the history
  • Loading branch information
BarisSari authored Feb 17, 2021
2 parents a9e9555 + 8cdb3c5 commit 7d07b5a
Show file tree
Hide file tree
Showing 10 changed files with 850 additions and 74 deletions.
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ include_trailing_comma = true
force_grid_wrap = 0
use_parentheses = true
line_length = 100
known_third_party =alembic,dateutil,geoalchemy2,geopy,halo,iterfzf,loguru,pg8000,pint,prompt_toolkit,pyfiglet,pygments,pytest,setuptools,shapely,sqlalchemy,tabulate,testing,tqdm
known_third_party =alembic,dateutil,geoalchemy2,geopy,halo,iterfzf,loguru,pg8000,pint,prompt_toolkit,pyfiglet,pygments,pytest,setuptools,shapely,sqlalchemy,sqlalchemy_utils,tabulate,testing,tqdm
6 changes: 1 addition & 5 deletions docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ["_static"]

html_context = {
"css_files": [
"_static/theme_overrides.css",
], # override wide tables in RTD theme
}
html_css_files = ["_static/theme_overrides.css"] # override wide tables in RTD theme


# -- Extension configuration -------------------------------------------------
Expand Down
2 changes: 1 addition & 1 deletion migrations/latest_revisions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"LATEST_POSTGRES_VERSION": "1be821cb7908",
"LATEST_SQLITE_VERSION": "cb9a5f93f0aa"
"LATEST_SQLITE_VERSION": "a382a0f4b1be"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Correct privacy_id type of HostedBy
Revision ID: a382a0f4b1be
Revises: cb9a5f93f0aa
Create Date: 2021-02-10 16:20:41.425779+00:00
"""
import sqlalchemy as sa
from alembic import op

import pepys_import

# revision identifiers, used by Alembic.
revision = "a382a0f4b1be"
down_revision = "cb9a5f93f0aa"
branch_labels = None
depends_on = None


def upgrade():
with op.batch_alter_table("HostedBy", schema=None) as batch_op:
batch_op.alter_column(
"privacy_id",
existing_type=sa.INTEGER(),
type_=pepys_import.utils.sqlalchemy_utils.UUIDType(length=16),
existing_nullable=False,
)


def downgrade():
with op.batch_alter_table("HostedBy", schema=None) as batch_op:
batch_op.alter_column(
"privacy_id",
existing_type=pepys_import.utils.sqlalchemy_utils.UUIDType(length=16),
type_=sa.INTEGER(),
existing_nullable=False,
)
3 changes: 2 additions & 1 deletion pepys_admin/maintenance/gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
from pepys_admin.maintenance.widgets.dropdown_box import DropdownBox
from pepys_admin.maintenance.widgets.filter_widget import FilterWidget
from pepys_admin.maintenance.widgets.filter_widget_utils import filter_widget_output_to_query
from pepys_import.core.store import constants
from pepys_import.core.store.data_store import DataStore

logger.remove()
Expand Down Expand Up @@ -335,7 +336,7 @@ def do_merge(platform_list, master_platform, set_percentage=None, is_cancelled=N
# outside of the GUI context too)
set_percentage(10)
with self.data_store.session_scope():
self.data_store.merge_platforms(platform_list, master_platform)
self.data_store.merge_generic(constants.PLATFORM, platform_list, master_platform)
time.sleep(3)
set_percentage(90)
time.sleep(1)
Expand Down
2 changes: 1 addition & 1 deletion pepys_import/core/store/common_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ def get_sensor(
class TaskMixin:
@declared_attr
def parent(self):
return relationship("Task", lazy="joined", join_depth=1, innerjoin=True, uselist=False)
return relationship("Task")

@declared_attr
def parent_name(self):
Expand Down
180 changes: 126 additions & 54 deletions pepys_import/core/store/data_store.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import os
import sys
import uuid
from contextlib import contextmanager
from datetime import datetime
from getpass import getuser
from importlib import import_module

from sqlalchemy import create_engine, inspect, or_
from sqlalchemy import create_engine, inspect
from sqlalchemy.event import listen
from sqlalchemy.exc import ArgumentError, OperationalError
from sqlalchemy.orm import sessionmaker, undefer
from sqlalchemy.sql import func
from sqlalchemy_utils import merge_references

from paths import PEPYS_IMPORT_DIRECTORY
from pepys_import import __version__
Expand All @@ -31,6 +33,7 @@

from ...utils.error_handling import handle_first_connection_error
from ...utils.sqlalchemy_utils import get_primary_key_for_table
from ...utils.table_name_utils import table_name_to_class_name
from ...utils.text_formatting_utils import custom_print_formatted_text, format_error_message
from .db_base import BasePostGIS, BaseSpatiaLite
from .db_status import TableTypes
Expand Down Expand Up @@ -1649,7 +1652,7 @@ def get_logs_by_change_id(self, change_id):
.all()
)

def merge_platforms(self, platform_list, master_id) -> bool:
def merge_platforms(self, platform_list, master_id, change_id) -> bool:
"""Merges given platforms. Moves sensors from other platforms to the Target platform.
If sensor with same name is already present on Target platform, moves measurements
to that sensor. Also moves entities in Comments, Participants, LogsHoldings, Geometry, Media
Expand All @@ -1664,33 +1667,10 @@ def merge_platforms(self, platform_list, master_id) -> bool:
"""
Platform = self.db_classes.Platform
Sensor = self.db_classes.Sensor
State = self.db_classes.State
Contact = self.db_classes.Contact

if isinstance(master_id, Platform):
master_id = master_id.platform_id

master_platform = (
self.session.query(Platform).filter(Platform.platform_id == master_id).scalar()
)
if not master_platform:
raise ValueError(f"No platform found with the given master_id: '{master_id}'!")
self._check_master_id(Platform, master_id)
master_sensor_names = self.session.query(Sensor.name).filter(Sensor.host == master_id).all()
master_sensor_names = set([n for (n,) in master_sensor_names])

sensor_list = list()
if isinstance(platform_list[0], Platform): # Extract platform_ids from platform objects
platform_list = [p.platform_id for p in platform_list]

if master_id in platform_list:
platform_list.remove(master_id) # We don't need to change these values

reason_platform_list = ",".join([str(p) for p in platform_list])
change_id = self.add_to_changes(
user=USER,
modified=datetime.utcnow(),
reason=f"Merging Platforms '{reason_platform_list}' to '{master_id}'.",
).change_id
for p_id in platform_list:
self.update_platform_ids(p_id, master_id, change_id)

Expand All @@ -1717,36 +1697,12 @@ def merge_platforms(self, platform_list, master_id) -> bool:
.filter(Sensor.host == master_id, Sensor.name == sensor.name)
.scalar()
)
query = self.session.query(State).filter(State.sensor_id == sensor.sensor_id)
[
self.add_to_logs(
table=constants.STATE,
row_id=s.sensor_id,
field="sensor_id",
new_value=str(sensor.sensor_id),
change_id=change_id,
)
for s in query.all()
]
query.update({"sensor_id": master_sensor_id}) # Update States

query = self.session.query(Contact).filter(
or_(Contact.sensor_id == sensor.sensor_id, Contact.subject_id == master_id)
self.merge_measurements(
constants.SENSOR, [sensor.sensor_id], master_sensor_id, change_id
)
[
self.add_to_logs(
table=constants.CONTACT,
row_id=s.sensor_id,
field="sensor_id",
new_value=str(sensor.sensor_id),
change_id=change_id,
)
for s in query.all()
]
query.update({"sensor_id": master_sensor_id}) # Update Contacts

for p_id in platform_list: # Delete merged platforms
self.session.query(Platform).filter(Platform.platform_id == p_id).delete()
# Delete merged platforms
self._delete_merged_objects(Platform, platform_list)
self.session.flush()
return True

Expand Down Expand Up @@ -1784,3 +1740,119 @@ def update_platform_ids(self, merge_platform_id, master_platform_id, change_id):
except Exception:
pass
self.session.flush()

def _check_master_id(self, table_obj, master_id):
master_obj = (
self.session.query(table_obj)
.filter(getattr(table_obj, get_primary_key_for_table(table_obj)) == master_id)
.scalar()
)
if not master_obj:
raise ValueError(f"No object found with the given master_id: '{master_id}'!")
return master_obj

def _delete_merged_objects(self, table_obj, id_list):
# Delete merged objects
self.session.query(table_obj).filter(
getattr(table_obj, get_primary_key_for_table(table_obj)).in_(id_list)
).delete(synchronize_session="fetch")
self.session.flush()

def merge_measurements(self, table_name, id_list, master_id, change_id):
if table_name == constants.SENSOR:
table_obj = self.db_classes.Sensor
field = "sensor_id"
self._check_master_id(table_obj, master_id)
table_objects = [self.db_classes.State, self.db_classes.Contact]
elif table_name == constants.DATAFILE:
table_obj = self.db_classes.Datafile
field = "source_id"
self._check_master_id(table_obj, master_id)
table_objects = [
self.db_classes.State,
self.db_classes.Contact,
self.db_classes.Activation,
self.db_classes.LogsHolding,
self.db_classes.Comment,
self.db_classes.Geometry1,
self.db_classes.Media,
]
else:
raise ValueError(
f"You should give one of the following tables to merge measurements: "
f"{constants.SENSOR}, {constants.DATAFILE}"
)
values = ",".join(map(str, id_list))
for t_obj in table_objects:
query = self.session.query(t_obj).filter(getattr(t_obj, field).in_(id_list))
[
self.add_to_logs(
table=t_obj.__tablename__,
row_id=getattr(s, field),
field=field,
new_value=values,
change_id=change_id,
)
for s in query.all()
]
query.update({field: master_id}, synchronize_session="fetch")

# Delete merged objects
self._delete_merged_objects(table_obj, id_list)

def merge_objects(self, table_name, id_list, master_id, change_id):
# Table names are plural in the database, therefore make it singular
table = table_name_to_class_name(table_name)
table_obj = getattr(self.db_classes, table)
to_obj = self._check_master_id(table_obj, master_id)

for obj_id in id_list:
primary_key_field = get_primary_key_for_table(table_obj)
from_obj = (
self.session.query(table_obj)
.filter(getattr(table_obj, primary_key_field) == obj_id)
.scalar()
)
merge_references(from_obj, to_obj)
self.add_to_logs(
table=table_obj.__tablename__,
row_id=getattr(from_obj, primary_key_field),
field=primary_key_field,
new_value=str(obj_id),
change_id=change_id,
)
self.session.flush()
# Delete merged objects
self._delete_merged_objects(table_obj, id_list)

def merge_generic(self, table_name, id_list, master_id) -> bool:
reference_table_objects = self.meta_classes[TableTypes.REFERENCE]
reference_table_names = [obj.__tablename__ for obj in reference_table_objects]

table = table_name_to_class_name(table_name)
table_obj = getattr(self.db_classes, table)
if id_list and not isinstance(id_list[0], uuid.UUID):
id_list = [getattr(i, get_primary_key_for_table(table_obj)) for i in id_list]

if not isinstance(master_id, uuid.UUID):
master_id = getattr(master_id, get_primary_key_for_table(table_obj))

reason_list = ",".join([str(p) for p in id_list])
change_id = self.add_to_changes(
user=USER,
modified=datetime.utcnow(),
reason=f"Merging {table_name} '{reason_list}' to '{master_id}'.",
).change_id

if master_id in id_list:
id_list.remove(master_id) # We don't need to change these values

if table_name == constants.PLATFORM:
self.merge_platforms(id_list, master_id, change_id)
elif table_name in [constants.SENSOR, constants.DATAFILE]:
self.merge_measurements(table_name, id_list, master_id, change_id)
elif table_name in reference_table_names + [constants.TAG, constants.TASK]:
self.merge_objects(table_name, id_list, master_id, change_id)
else:
return False
return True
2 changes: 1 addition & 1 deletion pepys_import/core/store/sqlite_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class HostedBy(BaseSpatiaLite, HostedByMixin):
hosted_from = Column(DATE, nullable=False)
host_to = Column(DATE, nullable=False)
privacy_id = Column(
Integer, ForeignKey("Privacies.privacy_id", onupdate="cascade"), nullable=False
UUIDType, ForeignKey("Privacies.privacy_id", onupdate="cascade"), nullable=False
)
created_date = Column(DateTime, default=datetime.utcnow)

Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ setuptools>=40.8.0
Pygments>=2.6.1
geopy>=1.22
halo>=0.0.31
loguru
loguru
sqlalchemy-utils==0.36.8
Loading

0 comments on commit 7d07b5a

Please sign in to comment.