Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generic Merge #757

Merged
merged 18 commits into from
Feb 17, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .isort.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ include_trailing_comma = true
force_grid_wrap = 0
use_parentheses = true
line_length = 100
known_third_party =alembic,dateutil,geoalchemy2,geopy,halo,iterfzf,loguru,pg8000,pint,prompt_toolkit,pyfiglet,pygments,pytest,setuptools,shapely,sqlalchemy,tabulate,testing,tqdm
known_third_party =alembic,dateutil,geoalchemy2,geopy,halo,iterfzf,loguru,pg8000,pint,prompt_toolkit,pyfiglet,pygments,pytest,setuptools,shapely,sqlalchemy,sqlalchemy_utils,tabulate,testing,tqdm
2 changes: 1 addition & 1 deletion migrations/latest_revisions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"LATEST_POSTGRES_VERSION": "1be821cb7908",
"LATEST_SQLITE_VERSION": "cb9a5f93f0aa"
"LATEST_SQLITE_VERSION": "a382a0f4b1be"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
"""Correct privacy_id type of HostedBy

Revision ID: a382a0f4b1be
Revises: cb9a5f93f0aa
Create Date: 2021-02-10 16:20:41.425779+00:00

"""
import sqlalchemy as sa
from alembic import op

import pepys_import

# revision identifiers, used by Alembic.
revision = "a382a0f4b1be"
down_revision = "cb9a5f93f0aa"
branch_labels = None
depends_on = None


def upgrade():
with op.batch_alter_table("HostedBy", schema=None) as batch_op:
batch_op.alter_column(
"privacy_id",
existing_type=sa.INTEGER(),
type_=pepys_import.utils.sqlalchemy_utils.UUIDType(length=16),
existing_nullable=False,
)


def downgrade():
with op.batch_alter_table("HostedBy", schema=None) as batch_op:
batch_op.alter_column(
"privacy_id",
existing_type=pepys_import.utils.sqlalchemy_utils.UUIDType(length=16),
type_=sa.INTEGER(),
existing_nullable=False,
)
113 changes: 106 additions & 7 deletions pepys_import/core/store/data_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from sqlalchemy.exc import ArgumentError, OperationalError
from sqlalchemy.orm import sessionmaker, undefer
from sqlalchemy.sql import func
from sqlalchemy_utils import merge_references
IanMayo marked this conversation as resolved.
Show resolved Hide resolved

from paths import PEPYS_IMPORT_DIRECTORY
from pepys_import import __version__
Expand All @@ -31,6 +32,7 @@

from ...utils.error_handling import handle_first_connection_error
from ...utils.sqlalchemy_utils import get_primary_key_for_table
from ...utils.table_name_utils import table_name_to_class_name
from ...utils.text_formatting_utils import custom_print_formatted_text, format_error_message
from .db_base import BasePostGIS, BaseSpatiaLite
from .db_status import TableTypes
Expand Down Expand Up @@ -1670,11 +1672,7 @@ def merge_platforms(self, platform_list, master_id) -> bool:
if isinstance(master_id, Platform):
master_id = master_id.platform_id

master_platform = (
self.session.query(Platform).filter(Platform.platform_id == master_id).scalar()
)
if not master_platform:
raise ValueError(f"No platform found with the given master_id: '{master_id}'!")
self._check_master_id(Platform, master_id)
master_sensor_names = self.session.query(Sensor.name).filter(Sensor.host == master_id).all()
master_sensor_names = set([n for (n,) in master_sensor_names])

Expand Down Expand Up @@ -1745,8 +1743,8 @@ def merge_platforms(self, platform_list, master_id) -> bool:
]
query.update({"sensor_id": master_sensor_id}) # Update Contacts

for p_id in platform_list: # Delete merged platforms
self.session.query(Platform).filter(Platform.platform_id == p_id).delete()
# Delete merged platforms
self._delete_merged_objects(Platform, platform_list)
self.session.flush()
return True

Expand Down Expand Up @@ -1784,3 +1782,104 @@ def update_platform_ids(self, merge_platform_id, master_platform_id, change_id):
except Exception:
pass
self.session.flush()

def _check_master_id(self, table_obj, master_id):
master_obj = (
self.session.query(table_obj)
.filter(getattr(table_obj, get_primary_key_for_table(table_obj)) == master_id)
.scalar()
)
if not master_obj:
raise ValueError(f"No object found with the given master_id: '{master_id}'!")
return master_obj

def _delete_merged_objects(self, table_obj, id_list):
# Delete merged objects
self.session.query(table_obj).filter(
getattr(table_obj, get_primary_key_for_table(table_obj)).in_(id_list)
).delete(synchronize_session="fetch")
self.session.flush()

def merge_measurements(self, table_name, id_list, master_id, change_id):
if table_name == constants.SENSOR:
table_obj = self.db_classes.Sensor
field = get_primary_key_for_table(table_obj)
self._check_master_id(table_obj, master_id)
table_objects = [self.db_classes.State, self.db_classes.Contact]
elif table_name == constants.DATAFILE:
table_obj = self.db_classes.Datafile
field = "source_id"
self._check_master_id(table_obj, master_id)
table_objects = [
self.db_classes.State,
self.db_classes.Contact,
self.db_classes.Activation,
self.db_classes.LogsHolding,
self.db_classes.Comment,
self.db_classes.Geometry1,
self.db_classes.Media,
]
else:
raise ValueError(
f"You should give one of the following tables to merge measurements: "
f"{constants.SENSOR}, {constants.DATAFILE}"
)

for t_obj in table_objects:
query = self.session.query(t_obj).filter(getattr(t_obj, field).in_(id_list))
[
self.add_to_logs(
table=t_obj.__tablename__,
row_id=getattr(s, field),
field=field,
new_value=str(getattr(s, field)),
change_id=change_id,
)
for s in query.all()
]
query.update({field: master_id}, synchronize_session="fetch")

# Delete merged objects
self._delete_merged_objects(table_obj, id_list)

def merge_objects(self, table_name, id_list, master_id, change_id):
# Table names are plural in the database, therefore make it singular
table = table_name_to_class_name(table_name)
table_obj = getattr(self.db_classes, table)
to_obj = self._check_master_id(table_obj, master_id)

for obj_id in id_list:
primary_key_field = get_primary_key_for_table(table_obj)
from_obj = (
self.session.query(table_obj)
.filter(getattr(table_obj, primary_key_field) == obj_id)
.scalar()
)
merge_references(from_obj, to_obj)
self.add_to_logs(
table=table_obj.__tablename__,
row_id=getattr(from_obj, primary_key_field),
field=primary_key_field,
new_value=str(obj_id),
change_id=change_id,
)
self.session.flush()
# Delete merged objects
self._delete_merged_objects(table_obj, id_list)

def merge_generic(self, table_name, id_list, master_id) -> bool:
reference_table_objects = self.meta_classes[TableTypes.REFERENCE]
reference_table_names = [obj.__tablename__ for obj in reference_table_objects]
reason_list = ",".join([str(p) for p in id_list])
change_id = self.add_to_changes(
user=USER,
modified=datetime.utcnow(),
reason=f"Merging {table_name} '{reason_list}' to '{master_id}'.",
).change_id
if table_name in [constants.SENSOR, constants.DATAFILE]:
self.merge_measurements(table_name, id_list, master_id, change_id)
elif table_name in reference_table_names:
self.merge_objects(table_name, id_list, master_id, change_id)
else:
return False
return True
2 changes: 1 addition & 1 deletion pepys_import/core/store/sqlite_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class HostedBy(BaseSpatiaLite, HostedByMixin):
hosted_from = Column(DATE, nullable=False)
host_to = Column(DATE, nullable=False)
privacy_id = Column(
Integer, ForeignKey("Privacies.privacy_id", onupdate="cascade"), nullable=False
UUIDType, ForeignKey("Privacies.privacy_id", onupdate="cascade"), nullable=False
)
created_date = Column(DateTime, default=datetime.utcnow)

Expand Down
3 changes: 2 additions & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,5 @@ setuptools>=40.8.0
Pygments>=2.6.1
geopy>=1.22
halo>=0.0.31
loguru
loguru
sqlalchemy-utils==0.36.8
Loading